Do not add a new Inotify watchers on timer
[ganeti-github.git] / lib / jqueue / __init__.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2014 Google Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions are
9 # met:
10 #
11 # 1. Redistributions of source code must retain the above copyright notice,
12 # this list of conditions and the following disclaimer.
13 #
14 # 2. Redistributions in binary form must reproduce the above copyright
15 # notice, this list of conditions and the following disclaimer in the
16 # documentation and/or other materials provided with the distribution.
17 #
18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
19 # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
20 # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
22 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
23 # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
24 # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
25 # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
26 # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
27 # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30
31 """Module implementing the job queue handling.
32
33 Locking: there's a single, large lock in the L{JobQueue} class. It's
34 used by all other classes in this module.
35
36 @var JOBQUEUE_THREADS: the number of worker threads we start for
37 processing jobs
38
39 """
40
41 import logging
42 import errno
43 import time
44 import weakref
45 import threading
46 import itertools
47 import operator
48 import os
49
50 try:
51 # pylint: disable=E0611
52 from pyinotify import pyinotify
53 except ImportError:
54 import pyinotify
55
56 from ganeti import asyncnotifier
57 from ganeti import constants
58 from ganeti import serializer
59 from ganeti import workerpool
60 from ganeti import locking
61 from ganeti import luxi
62 from ganeti import opcodes
63 from ganeti import opcodes_base
64 from ganeti import errors
65 from ganeti import mcpu
66 from ganeti import utils
67 from ganeti import jstore
68 import ganeti.rpc.node as rpc
69 from ganeti import runtime
70 from ganeti import netutils
71 from ganeti import compat
72 from ganeti import ht
73 from ganeti import query
74 from ganeti import qlang
75 from ganeti import pathutils
76 from ganeti import vcluster
77 from ganeti.cmdlib import cluster
78
79
80 JOBQUEUE_THREADS = 1
81
82 # member lock names to be passed to @ssynchronized decorator
83 _LOCK = "_lock"
84 _QUEUE = "_queue"
85
86 #: Retrieves "id" attribute
87 _GetIdAttr = operator.attrgetter("id")
88
89
90 class CancelJob(Exception):
91 """Special exception to cancel a job.
92
93 """
94
95
96 def TimeStampNow():
97 """Returns the current timestamp.
98
99 @rtype: tuple
100 @return: the current time in the (seconds, microseconds) format
101
102 """
103 return utils.SplitTime(time.time())
104
105
106 def _CallJqUpdate(runner, names, file_name, content):
107 """Updates job queue file after virtualizing filename.
108
109 """
110 virt_file_name = vcluster.MakeVirtualPath(file_name)
111 return runner.call_jobqueue_update(names, virt_file_name, content)
112
113
114 class _QueuedOpCode(object):
115 """Encapsulates an opcode object.
116
117 @ivar log: holds the execution log and consists of tuples
118 of the form C{(log_serial, timestamp, level, message)}
119 @ivar input: the OpCode we encapsulate
120 @ivar status: the current status
121 @ivar result: the result of the LU execution
122 @ivar start_timestamp: timestamp for the start of the execution
123 @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
124 @ivar stop_timestamp: timestamp for the end of the execution
125
126 """
127 __slots__ = ["input", "status", "result", "log", "priority",
128 "start_timestamp", "exec_timestamp", "end_timestamp",
129 "__weakref__"]
130
131 def __init__(self, op):
132 """Initializes instances of this class.
133
134 @type op: L{opcodes.OpCode}
135 @param op: the opcode we encapsulate
136
137 """
138 self.input = op
139 self.status = constants.OP_STATUS_QUEUED
140 self.result = None
141 self.log = []
142 self.start_timestamp = None
143 self.exec_timestamp = None
144 self.end_timestamp = None
145
146 # Get initial priority (it might change during the lifetime of this opcode)
147 self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
148
149 @classmethod
150 def Restore(cls, state):
151 """Restore the _QueuedOpCode from the serialized form.
152
153 @type state: dict
154 @param state: the serialized state
155 @rtype: _QueuedOpCode
156 @return: a new _QueuedOpCode instance
157
158 """
159 obj = _QueuedOpCode.__new__(cls)
160 obj.input = opcodes.OpCode.LoadOpCode(state["input"])
161 obj.status = state["status"]
162 obj.result = state["result"]
163 obj.log = state["log"]
164 obj.start_timestamp = state.get("start_timestamp", None)
165 obj.exec_timestamp = state.get("exec_timestamp", None)
166 obj.end_timestamp = state.get("end_timestamp", None)
167 obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
168 return obj
169
170 def Serialize(self):
171 """Serializes this _QueuedOpCode.
172
173 @rtype: dict
174 @return: the dictionary holding the serialized state
175
176 """
177 return {
178 "input": self.input.__getstate__(),
179 "status": self.status,
180 "result": self.result,
181 "log": self.log,
182 "start_timestamp": self.start_timestamp,
183 "exec_timestamp": self.exec_timestamp,
184 "end_timestamp": self.end_timestamp,
185 "priority": self.priority,
186 }
187
188
189 class _QueuedJob(object):
190 """In-memory job representation.
191
192 This is what we use to track the user-submitted jobs. Locking must
193 be taken care of by users of this class.
194
195 @type queue: L{JobQueue}
196 @ivar queue: the parent queue
197 @ivar id: the job ID
198 @type ops: list
199 @ivar ops: the list of _QueuedOpCode that constitute the job
200 @type log_serial: int
201 @ivar log_serial: holds the index for the next log entry
202 @ivar received_timestamp: the timestamp for when the job was received
203 @ivar start_timestmap: the timestamp for start of execution
204 @ivar end_timestamp: the timestamp for end of execution
205 @ivar writable: Whether the job is allowed to be modified
206
207 """
208 # pylint: disable=W0212
209 __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
210 "received_timestamp", "start_timestamp", "end_timestamp",
211 "processor_lock", "writable", "archived",
212 "livelock", "process_id",
213 "__weakref__"]
214
215 def AddReasons(self, pickup=False):
216 """Extend the reason trail
217
218 Add the reason for all the opcodes of this job to be executed.
219
220 """
221 count = 0
222 for queued_op in self.ops:
223 op = queued_op.input
224 if pickup:
225 reason_src_prefix = constants.OPCODE_REASON_SRC_PICKUP
226 else:
227 reason_src_prefix = constants.OPCODE_REASON_SRC_OPCODE
228 reason_src = opcodes_base.NameToReasonSrc(op.__class__.__name__,
229 reason_src_prefix)
230 reason_text = "job=%d;index=%d" % (self.id, count)
231 reason = getattr(op, "reason", [])
232 reason.append((reason_src, reason_text, utils.EpochNano()))
233 op.reason = reason
234 count = count + 1
235
236 def __init__(self, queue, job_id, ops, writable):
237 """Constructor for the _QueuedJob.
238
239 @type queue: L{JobQueue}
240 @param queue: our parent queue
241 @type job_id: job_id
242 @param job_id: our job id
243 @type ops: list
244 @param ops: the list of opcodes we hold, which will be encapsulated
245 in _QueuedOpCodes
246 @type writable: bool
247 @param writable: Whether job can be modified
248
249 """
250 if not ops:
251 raise errors.GenericError("A job needs at least one opcode")
252
253 self.queue = queue
254 self.id = int(job_id)
255 self.ops = [_QueuedOpCode(op) for op in ops]
256 self.AddReasons()
257 self.log_serial = 0
258 self.received_timestamp = TimeStampNow()
259 self.start_timestamp = None
260 self.end_timestamp = None
261 self.archived = False
262 self.livelock = None
263 self.process_id = None
264
265 self._InitInMemory(self, writable)
266
267 assert not self.archived, "New jobs can not be marked as archived"
268
269 @staticmethod
270 def _InitInMemory(obj, writable):
271 """Initializes in-memory variables.
272
273 """
274 obj.writable = writable
275 obj.ops_iter = None
276 obj.cur_opctx = None
277
278 # Read-only jobs are not processed and therefore don't need a lock
279 if writable:
280 obj.processor_lock = threading.Lock()
281 else:
282 obj.processor_lock = None
283
284 def __repr__(self):
285 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
286 "id=%s" % self.id,
287 "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
288
289 return "<%s at %#x>" % (" ".join(status), id(self))
290
291 @classmethod
292 def Restore(cls, queue, state, writable, archived):
293 """Restore a _QueuedJob from serialized state:
294
295 @type queue: L{JobQueue}
296 @param queue: to which queue the restored job belongs
297 @type state: dict
298 @param state: the serialized state
299 @type writable: bool
300 @param writable: Whether job can be modified
301 @type archived: bool
302 @param archived: Whether job was already archived
303 @rtype: _JobQueue
304 @return: the restored _JobQueue instance
305
306 """
307 obj = _QueuedJob.__new__(cls)
308 obj.queue = queue
309 obj.id = int(state["id"])
310 obj.received_timestamp = state.get("received_timestamp", None)
311 obj.start_timestamp = state.get("start_timestamp", None)
312 obj.end_timestamp = state.get("end_timestamp", None)
313 obj.archived = archived
314 obj.livelock = state.get("livelock", None)
315 obj.process_id = state.get("process_id", None)
316 if obj.process_id is not None:
317 obj.process_id = int(obj.process_id)
318
319 obj.ops = []
320 obj.log_serial = 0
321 for op_state in state["ops"]:
322 op = _QueuedOpCode.Restore(op_state)
323 for log_entry in op.log:
324 obj.log_serial = max(obj.log_serial, log_entry[0])
325 obj.ops.append(op)
326
327 cls._InitInMemory(obj, writable)
328
329 return obj
330
331 def Serialize(self):
332 """Serialize the _JobQueue instance.
333
334 @rtype: dict
335 @return: the serialized state
336
337 """
338 return {
339 "id": self.id,
340 "ops": [op.Serialize() for op in self.ops],
341 "start_timestamp": self.start_timestamp,
342 "end_timestamp": self.end_timestamp,
343 "received_timestamp": self.received_timestamp,
344 "livelock": self.livelock,
345 "process_id": self.process_id,
346 }
347
348 def CalcStatus(self):
349 """Compute the status of this job.
350
351 This function iterates over all the _QueuedOpCodes in the job and
352 based on their status, computes the job status.
353
354 The algorithm is:
355 - if we find a cancelled, or finished with error, the job
356 status will be the same
357 - otherwise, the last opcode with the status one of:
358 - waitlock
359 - canceling
360 - running
361
362 will determine the job status
363
364 - otherwise, it means either all opcodes are queued, or success,
365 and the job status will be the same
366
367 @return: the job status
368
369 """
370 status = constants.JOB_STATUS_QUEUED
371
372 all_success = True
373 for op in self.ops:
374 if op.status == constants.OP_STATUS_SUCCESS:
375 continue
376
377 all_success = False
378
379 if op.status == constants.OP_STATUS_QUEUED:
380 pass
381 elif op.status == constants.OP_STATUS_WAITING:
382 status = constants.JOB_STATUS_WAITING
383 elif op.status == constants.OP_STATUS_RUNNING:
384 status = constants.JOB_STATUS_RUNNING
385 elif op.status == constants.OP_STATUS_CANCELING:
386 status = constants.JOB_STATUS_CANCELING
387 break
388 elif op.status == constants.OP_STATUS_ERROR:
389 status = constants.JOB_STATUS_ERROR
390 # The whole job fails if one opcode failed
391 break
392 elif op.status == constants.OP_STATUS_CANCELED:
393 status = constants.OP_STATUS_CANCELED
394 break
395
396 if all_success:
397 status = constants.JOB_STATUS_SUCCESS
398
399 return status
400
401 def CalcPriority(self):
402 """Gets the current priority for this job.
403
404 Only unfinished opcodes are considered. When all are done, the default
405 priority is used.
406
407 @rtype: int
408
409 """
410 priorities = [op.priority for op in self.ops
411 if op.status not in constants.OPS_FINALIZED]
412
413 if not priorities:
414 # All opcodes are done, assume default priority
415 return constants.OP_PRIO_DEFAULT
416
417 return min(priorities)
418
419 def GetLogEntries(self, newer_than):
420 """Selectively returns the log entries.
421
422 @type newer_than: None or int
423 @param newer_than: if this is None, return all log entries,
424 otherwise return only the log entries with serial higher
425 than this value
426 @rtype: list
427 @return: the list of the log entries selected
428
429 """
430 if newer_than is None:
431 serial = -1
432 else:
433 serial = newer_than
434
435 entries = []
436 for op in self.ops:
437 entries.extend(filter(lambda entry: entry[0] > serial, op.log))
438
439 return entries
440
441 def MarkUnfinishedOps(self, status, result):
442 """Mark unfinished opcodes with a given status and result.
443
444 This is an utility function for marking all running or waiting to
445 be run opcodes with a given status. Opcodes which are already
446 finalised are not changed.
447
448 @param status: a given opcode status
449 @param result: the opcode result
450
451 """
452 not_marked = True
453 for op in self.ops:
454 if op.status in constants.OPS_FINALIZED:
455 assert not_marked, "Finalized opcodes found after non-finalized ones"
456 continue
457 op.status = status
458 op.result = result
459 not_marked = False
460
461 def Finalize(self):
462 """Marks the job as finalized.
463
464 """
465 self.end_timestamp = TimeStampNow()
466
467 def Cancel(self):
468 """Marks job as canceled/-ing if possible.
469
470 @rtype: tuple; (bool, string)
471 @return: Boolean describing whether job was successfully canceled or marked
472 as canceling and a text message
473
474 """
475 status = self.CalcStatus()
476
477 if status == constants.JOB_STATUS_QUEUED:
478 self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
479 "Job canceled by request")
480 self.Finalize()
481 return (True, "Job %s canceled" % self.id)
482
483 elif status == constants.JOB_STATUS_WAITING:
484 # The worker will notice the new status and cancel the job
485 self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
486 return (True, "Job %s will be canceled" % self.id)
487
488 else:
489 logging.debug("Job %s is no longer waiting in the queue", self.id)
490 return (False, "Job %s is no longer waiting in the queue" % self.id)
491
492 def ChangePriority(self, priority):
493 """Changes the job priority.
494
495 @type priority: int
496 @param priority: New priority
497 @rtype: tuple; (bool, string)
498 @return: Boolean describing whether job's priority was successfully changed
499 and a text message
500
501 """
502 status = self.CalcStatus()
503
504 if status in constants.JOBS_FINALIZED:
505 return (False, "Job %s is finished" % self.id)
506 elif status == constants.JOB_STATUS_CANCELING:
507 return (False, "Job %s is cancelling" % self.id)
508 else:
509 assert status in (constants.JOB_STATUS_QUEUED,
510 constants.JOB_STATUS_WAITING,
511 constants.JOB_STATUS_RUNNING)
512
513 changed = False
514 for op in self.ops:
515 if (op.status == constants.OP_STATUS_RUNNING or
516 op.status in constants.OPS_FINALIZED):
517 assert not changed, \
518 ("Found opcode for which priority should not be changed after"
519 " priority has been changed for previous opcodes")
520 continue
521
522 assert op.status in (constants.OP_STATUS_QUEUED,
523 constants.OP_STATUS_WAITING)
524
525 changed = True
526
527 # Set new priority (doesn't modify opcode input)
528 op.priority = priority
529
530 if changed:
531 return (True, ("Priorities of pending opcodes for job %s have been"
532 " changed to %s" % (self.id, priority)))
533 else:
534 return (False, "Job %s had no pending opcodes" % self.id)
535
536 def SetPid(self, pid):
537 """Sets the job's process ID
538
539 @type pid: int
540 @param pid: the process ID
541
542 """
543 status = self.CalcStatus()
544
545 if status in (constants.JOB_STATUS_QUEUED,
546 constants.JOB_STATUS_WAITING):
547 if self.process_id is not None:
548 logging.warning("Replacing the process id %s of job %s with %s",
549 self.process_id, self.id, pid)
550 self.process_id = pid
551 else:
552 logging.warning("Can set pid only for queued/waiting jobs")
553
554
555 class _OpExecCallbacks(mcpu.OpExecCbBase):
556
557 def __init__(self, queue, job, op):
558 """Initializes this class.
559
560 @type queue: L{JobQueue}
561 @param queue: Job queue
562 @type job: L{_QueuedJob}
563 @param job: Job object
564 @type op: L{_QueuedOpCode}
565 @param op: OpCode
566
567 """
568 super(_OpExecCallbacks, self).__init__()
569
570 assert queue, "Queue is missing"
571 assert job, "Job is missing"
572 assert op, "Opcode is missing"
573
574 self._queue = queue
575 self._job = job
576 self._op = op
577
578 def _CheckCancel(self):
579 """Raises an exception to cancel the job if asked to.
580
581 """
582 # Cancel here if we were asked to
583 if self._op.status == constants.OP_STATUS_CANCELING:
584 logging.debug("Canceling opcode")
585 raise CancelJob()
586
587 @locking.ssynchronized(_QUEUE, shared=1)
588 def NotifyStart(self):
589 """Mark the opcode as running, not lock-waiting.
590
591 This is called from the mcpu code as a notifier function, when the LU is
592 finally about to start the Exec() method. Of course, to have end-user
593 visible results, the opcode must be initially (before calling into
594 Processor.ExecOpCode) set to OP_STATUS_WAITING.
595
596 """
597 assert self._op in self._job.ops
598 assert self._op.status in (constants.OP_STATUS_WAITING,
599 constants.OP_STATUS_CANCELING)
600
601 # Cancel here if we were asked to
602 self._CheckCancel()
603
604 logging.debug("Opcode is now running")
605
606 self._op.status = constants.OP_STATUS_RUNNING
607 self._op.exec_timestamp = TimeStampNow()
608
609 # And finally replicate the job status
610 self._queue.UpdateJobUnlocked(self._job)
611
612 @locking.ssynchronized(_QUEUE, shared=1)
613 def NotifyRetry(self):
614 """Mark opcode again as lock-waiting.
615
616 This is called from the mcpu code just after calling PrepareRetry.
617 The opcode will now again acquire locks (more, hopefully).
618
619 """
620 self._op.status = constants.OP_STATUS_WAITING
621 logging.debug("Opcode will be retried. Back to waiting.")
622
623 @locking.ssynchronized(_QUEUE, shared=1)
624 def _AppendFeedback(self, timestamp, log_type, log_msg):
625 """Internal feedback append function, with locks
626
627 """
628 self._job.log_serial += 1
629 self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
630 self._queue.UpdateJobUnlocked(self._job, replicate=False)
631
632 def Feedback(self, *args):
633 """Append a log entry.
634
635 """
636 assert len(args) < 3
637
638 if len(args) == 1:
639 log_type = constants.ELOG_MESSAGE
640 log_msg = args[0]
641 else:
642 (log_type, log_msg) = args
643
644 # The time is split to make serialization easier and not lose
645 # precision.
646 timestamp = utils.SplitTime(time.time())
647 self._AppendFeedback(timestamp, log_type, log_msg)
648
649 def CurrentPriority(self):
650 """Returns current priority for opcode.
651
652 """
653 assert self._op.status in (constants.OP_STATUS_WAITING,
654 constants.OP_STATUS_CANCELING)
655
656 # Cancel here if we were asked to
657 self._CheckCancel()
658
659 return self._op.priority
660
661 def SubmitManyJobs(self, jobs):
662 """Submits jobs for processing.
663
664 See L{JobQueue.SubmitManyJobs}.
665
666 """
667 # Locking is done in job queue
668 return self._queue.SubmitManyJobs(jobs)
669
670
671 def _EncodeOpError(err):
672 """Encodes an error which occurred while processing an opcode.
673
674 """
675 if isinstance(err, errors.GenericError):
676 to_encode = err
677 else:
678 to_encode = errors.OpExecError(str(err))
679
680 return errors.EncodeException(to_encode)
681
682
683 class _TimeoutStrategyWrapper:
684 def __init__(self, fn):
685 """Initializes this class.
686
687 """
688 self._fn = fn
689 self._next = None
690
691 def _Advance(self):
692 """Gets the next timeout if necessary.
693
694 """
695 if self._next is None:
696 self._next = self._fn()
697
698 def Peek(self):
699 """Returns the next timeout.
700
701 """
702 self._Advance()
703 return self._next
704
705 def Next(self):
706 """Returns the current timeout and advances the internal state.
707
708 """
709 self._Advance()
710 result = self._next
711 self._next = None
712 return result
713
714
715 class _OpExecContext:
716 def __init__(self, op, index, log_prefix, timeout_strategy_factory):
717 """Initializes this class.
718
719 """
720 self.op = op
721 self.index = index
722 self.log_prefix = log_prefix
723 self.summary = op.input.Summary()
724
725 # Create local copy to modify
726 if getattr(op.input, opcodes_base.DEPEND_ATTR, None):
727 self.jobdeps = op.input.depends[:]
728 else:
729 self.jobdeps = None
730
731 self._timeout_strategy_factory = timeout_strategy_factory
732 self._ResetTimeoutStrategy()
733
734 def _ResetTimeoutStrategy(self):
735 """Creates a new timeout strategy.
736
737 """
738 self._timeout_strategy = \
739 _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
740
741 def CheckPriorityIncrease(self):
742 """Checks whether priority can and should be increased.
743
744 Called when locks couldn't be acquired.
745
746 """
747 op = self.op
748
749 # Exhausted all retries and next round should not use blocking acquire
750 # for locks?
751 if (self._timeout_strategy.Peek() is None and
752 op.priority > constants.OP_PRIO_HIGHEST):
753 logging.debug("Increasing priority")
754 op.priority -= 1
755 self._ResetTimeoutStrategy()
756 return True
757
758 return False
759
760 def GetNextLockTimeout(self):
761 """Returns the next lock acquire timeout.
762
763 """
764 return self._timeout_strategy.Next()
765
766
767 class _JobProcessor(object):
768 (DEFER,
769 WAITDEP,
770 FINISHED) = range(1, 4)
771
772 def __init__(self, queue, opexec_fn, job,
773 _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
774 """Initializes this class.
775
776 """
777 self.queue = queue
778 self.opexec_fn = opexec_fn
779 self.job = job
780 self._timeout_strategy_factory = _timeout_strategy_factory
781
782 @staticmethod
783 def _FindNextOpcode(job, timeout_strategy_factory):
784 """Locates the next opcode to run.
785
786 @type job: L{_QueuedJob}
787 @param job: Job object
788 @param timeout_strategy_factory: Callable to create new timeout strategy
789
790 """
791 # Create some sort of a cache to speed up locating next opcode for future
792 # lookups
793 # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
794 # pending and one for processed ops.
795 if job.ops_iter is None:
796 job.ops_iter = enumerate(job.ops)
797
798 # Find next opcode to run
799 while True:
800 try:
801 (idx, op) = job.ops_iter.next()
802 except StopIteration:
803 raise errors.ProgrammerError("Called for a finished job")
804
805 if op.status == constants.OP_STATUS_RUNNING:
806 # Found an opcode already marked as running
807 raise errors.ProgrammerError("Called for job marked as running")
808
809 opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
810 timeout_strategy_factory)
811
812 if op.status not in constants.OPS_FINALIZED:
813 return opctx
814
815 # This is a job that was partially completed before master daemon
816 # shutdown, so it can be expected that some opcodes are already
817 # completed successfully (if any did error out, then the whole job
818 # should have been aborted and not resubmitted for processing).
819 logging.info("%s: opcode %s already processed, skipping",
820 opctx.log_prefix, opctx.summary)
821
822 @staticmethod
823 def _MarkWaitlock(job, op):
824 """Marks an opcode as waiting for locks.
825
826 The job's start timestamp is also set if necessary.
827
828 @type job: L{_QueuedJob}
829 @param job: Job object
830 @type op: L{_QueuedOpCode}
831 @param op: Opcode object
832
833 """
834 assert op in job.ops
835 assert op.status in (constants.OP_STATUS_QUEUED,
836 constants.OP_STATUS_WAITING)
837
838 update = False
839
840 op.result = None
841
842 if op.status == constants.OP_STATUS_QUEUED:
843 op.status = constants.OP_STATUS_WAITING
844 update = True
845
846 if op.start_timestamp is None:
847 op.start_timestamp = TimeStampNow()
848 update = True
849
850 if job.start_timestamp is None:
851 job.start_timestamp = op.start_timestamp
852 update = True
853
854 assert op.status == constants.OP_STATUS_WAITING
855
856 return update
857
858 @staticmethod
859 def _CheckDependencies(queue, job, opctx):
860 """Checks if an opcode has dependencies and if so, processes them.
861
862 @type queue: L{JobQueue}
863 @param queue: Queue object
864 @type job: L{_QueuedJob}
865 @param job: Job object
866 @type opctx: L{_OpExecContext}
867 @param opctx: Opcode execution context
868 @rtype: bool
869 @return: Whether opcode will be re-scheduled by dependency tracker
870
871 """
872 op = opctx.op
873
874 result = False
875
876 while opctx.jobdeps:
877 (dep_job_id, dep_status) = opctx.jobdeps[0]
878
879 (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
880 dep_status)
881 assert ht.TNonEmptyString(depmsg), "No dependency message"
882
883 logging.info("%s: %s", opctx.log_prefix, depmsg)
884
885 if depresult == _JobDependencyManager.CONTINUE:
886 # Remove dependency and continue
887 opctx.jobdeps.pop(0)
888
889 elif depresult == _JobDependencyManager.WAIT:
890 # Need to wait for notification, dependency tracker will re-add job
891 # to workerpool
892 result = True
893 break
894
895 elif depresult == _JobDependencyManager.CANCEL:
896 # Job was cancelled, cancel this job as well
897 job.Cancel()
898 assert op.status == constants.OP_STATUS_CANCELING
899 break
900
901 elif depresult in (_JobDependencyManager.WRONGSTATUS,
902 _JobDependencyManager.ERROR):
903 # Job failed or there was an error, this job must fail
904 op.status = constants.OP_STATUS_ERROR
905 op.result = _EncodeOpError(errors.OpExecError(depmsg))
906 break
907
908 else:
909 raise errors.ProgrammerError("Unknown dependency result '%s'" %
910 depresult)
911
912 return result
913
914 def _ExecOpCodeUnlocked(self, opctx):
915 """Processes one opcode and returns the result.
916
917 """
918 op = opctx.op
919
920 assert op.status in (constants.OP_STATUS_WAITING,
921 constants.OP_STATUS_CANCELING)
922
923 # The very last check if the job was cancelled before trying to execute
924 if op.status == constants.OP_STATUS_CANCELING:
925 return (constants.OP_STATUS_CANCELING, None)
926
927 timeout = opctx.GetNextLockTimeout()
928
929 try:
930 # Make sure not to hold queue lock while calling ExecOpCode
931 result = self.opexec_fn(op.input,
932 _OpExecCallbacks(self.queue, self.job, op),
933 timeout=timeout)
934 except mcpu.LockAcquireTimeout:
935 assert timeout is not None, "Received timeout for blocking acquire"
936 logging.debug("Couldn't acquire locks in %0.6fs", timeout)
937
938 assert op.status in (constants.OP_STATUS_WAITING,
939 constants.OP_STATUS_CANCELING)
940
941 # Was job cancelled while we were waiting for the lock?
942 if op.status == constants.OP_STATUS_CANCELING:
943 return (constants.OP_STATUS_CANCELING, None)
944
945 # Stay in waitlock while trying to re-acquire lock
946 return (constants.OP_STATUS_WAITING, None)
947 except CancelJob:
948 logging.exception("%s: Canceling job", opctx.log_prefix)
949 assert op.status == constants.OP_STATUS_CANCELING
950 return (constants.OP_STATUS_CANCELING, None)
951
952 except Exception, err: # pylint: disable=W0703
953 logging.exception("%s: Caught exception in %s",
954 opctx.log_prefix, opctx.summary)
955 return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
956 else:
957 logging.debug("%s: %s successful",
958 opctx.log_prefix, opctx.summary)
959 return (constants.OP_STATUS_SUCCESS, result)
960
961 def __call__(self, _nextop_fn=None):
962 """Continues execution of a job.
963
964 @param _nextop_fn: Callback function for tests
965 @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
966 be deferred and C{WAITDEP} if the dependency manager
967 (L{_JobDependencyManager}) will re-schedule the job when appropriate
968
969 """
970 queue = self.queue
971 job = self.job
972
973 logging.debug("Processing job %s", job.id)
974
975 queue.acquire(shared=1)
976 try:
977 opcount = len(job.ops)
978
979 assert job.writable, "Expected writable job"
980
981 # Don't do anything for finalized jobs
982 if job.CalcStatus() in constants.JOBS_FINALIZED:
983 return self.FINISHED
984
985 # Is a previous opcode still pending?
986 if job.cur_opctx:
987 opctx = job.cur_opctx
988 job.cur_opctx = None
989 else:
990 if __debug__ and _nextop_fn:
991 _nextop_fn()
992 opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
993
994 op = opctx.op
995
996 # Consistency check
997 assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
998 constants.OP_STATUS_CANCELING)
999 for i in job.ops[opctx.index + 1:])
1000
1001 assert op.status in (constants.OP_STATUS_QUEUED,
1002 constants.OP_STATUS_WAITING,
1003 constants.OP_STATUS_CANCELING)
1004
1005 assert (op.priority <= constants.OP_PRIO_LOWEST and
1006 op.priority >= constants.OP_PRIO_HIGHEST)
1007
1008 waitjob = None
1009
1010 if op.status != constants.OP_STATUS_CANCELING:
1011 assert op.status in (constants.OP_STATUS_QUEUED,
1012 constants.OP_STATUS_WAITING)
1013
1014 # Prepare to start opcode
1015 if self._MarkWaitlock(job, op):
1016 # Write to disk
1017 queue.UpdateJobUnlocked(job)
1018
1019 assert op.status == constants.OP_STATUS_WAITING
1020 assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1021 assert job.start_timestamp and op.start_timestamp
1022 assert waitjob is None
1023
1024 # Check if waiting for a job is necessary
1025 waitjob = self._CheckDependencies(queue, job, opctx)
1026
1027 assert op.status in (constants.OP_STATUS_WAITING,
1028 constants.OP_STATUS_CANCELING,
1029 constants.OP_STATUS_ERROR)
1030
1031 if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1032 constants.OP_STATUS_ERROR)):
1033 logging.info("%s: opcode %s waiting for locks",
1034 opctx.log_prefix, opctx.summary)
1035
1036 assert not opctx.jobdeps, "Not all dependencies were removed"
1037
1038 queue.release()
1039 try:
1040 (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1041 finally:
1042 queue.acquire(shared=1)
1043
1044 op.status = op_status
1045 op.result = op_result
1046
1047 assert not waitjob
1048
1049 if op.status in (constants.OP_STATUS_WAITING,
1050 constants.OP_STATUS_QUEUED):
1051 # waiting: Couldn't get locks in time
1052 # queued: Queue is shutting down
1053 assert not op.end_timestamp
1054 else:
1055 # Finalize opcode
1056 op.end_timestamp = TimeStampNow()
1057
1058 if op.status == constants.OP_STATUS_CANCELING:
1059 assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1060 for i in job.ops[opctx.index:])
1061 else:
1062 assert op.status in constants.OPS_FINALIZED
1063
1064 if op.status == constants.OP_STATUS_QUEUED:
1065 # Queue is shutting down
1066 assert not waitjob
1067
1068 finalize = False
1069
1070 # Reset context
1071 job.cur_opctx = None
1072
1073 # In no case must the status be finalized here
1074 assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
1075
1076 elif op.status == constants.OP_STATUS_WAITING or waitjob:
1077 finalize = False
1078
1079 if not waitjob and opctx.CheckPriorityIncrease():
1080 # Priority was changed, need to update on-disk file
1081 queue.UpdateJobUnlocked(job)
1082
1083 # Keep around for another round
1084 job.cur_opctx = opctx
1085
1086 assert (op.priority <= constants.OP_PRIO_LOWEST and
1087 op.priority >= constants.OP_PRIO_HIGHEST)
1088
1089 # In no case must the status be finalized here
1090 assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1091
1092 else:
1093 # Ensure all opcodes so far have been successful
1094 assert (opctx.index == 0 or
1095 compat.all(i.status == constants.OP_STATUS_SUCCESS
1096 for i in job.ops[:opctx.index]))
1097
1098 # Reset context
1099 job.cur_opctx = None
1100
1101 if op.status == constants.OP_STATUS_SUCCESS:
1102 finalize = False
1103
1104 elif op.status == constants.OP_STATUS_ERROR:
1105 # If we get here, we cannot afford to check for any consistency
1106 # any more, we just want to clean up.
1107 # TODO: Actually, it wouldn't be a bad idea to start a timer
1108 # here to kill the whole process.
1109 to_encode = errors.OpExecError("Preceding opcode failed")
1110 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1111 _EncodeOpError(to_encode))
1112 finalize = True
1113 elif op.status == constants.OP_STATUS_CANCELING:
1114 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1115 "Job canceled by request")
1116 finalize = True
1117
1118 else:
1119 raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1120
1121 if opctx.index == (opcount - 1):
1122 # Finalize on last opcode
1123 finalize = True
1124
1125 if finalize:
1126 # All opcodes have been run, finalize job
1127 job.Finalize()
1128
1129 # Write to disk. If the job status is final, this is the final write
1130 # allowed. Once the file has been written, it can be archived anytime.
1131 queue.UpdateJobUnlocked(job)
1132
1133 assert not waitjob
1134
1135 if finalize:
1136 logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1137 return self.FINISHED
1138
1139 assert not waitjob or queue.depmgr.JobWaiting(job)
1140
1141 if waitjob:
1142 return self.WAITDEP
1143 else:
1144 return self.DEFER
1145 finally:
1146 assert job.writable, "Job became read-only while being processed"
1147 queue.release()
1148
1149
1150 def _EvaluateJobProcessorResult(depmgr, job, result):
1151 """Looks at a result from L{_JobProcessor} for a job.
1152
1153 To be used in a L{_JobQueueWorker}.
1154
1155 """
1156 if result == _JobProcessor.FINISHED:
1157 # Notify waiting jobs
1158 depmgr.NotifyWaiters(job.id)
1159
1160 elif result == _JobProcessor.DEFER:
1161 # Schedule again
1162 raise workerpool.DeferTask(priority=job.CalcPriority())
1163
1164 elif result == _JobProcessor.WAITDEP:
1165 # No-op, dependency manager will re-schedule
1166 pass
1167
1168 else:
1169 raise errors.ProgrammerError("Job processor returned unknown status %s" %
1170 (result, ))
1171
1172
1173 class _JobQueueWorker(workerpool.BaseWorker):
1174 """The actual job workers.
1175
1176 """
1177 def RunTask(self, job): # pylint: disable=W0221
1178 """Job executor.
1179
1180 @type job: L{_QueuedJob}
1181 @param job: the job to be processed
1182
1183 """
1184 assert job.writable, "Expected writable job"
1185
1186 # Ensure only one worker is active on a single job. If a job registers for
1187 # a dependency job, and the other job notifies before the first worker is
1188 # done, the job can end up in the tasklist more than once.
1189 job.processor_lock.acquire()
1190 try:
1191 return self._RunTaskInner(job)
1192 finally:
1193 job.processor_lock.release()
1194
1195 def _RunTaskInner(self, job):
1196 """Executes a job.
1197
1198 Must be called with per-job lock acquired.
1199
1200 """
1201 queue = job.queue
1202 assert queue == self.pool.queue
1203
1204 setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
1205 setname_fn(None)
1206
1207 proc = mcpu.Processor(queue.context, job.id)
1208
1209 # Create wrapper for setting thread name
1210 wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
1211 proc.ExecOpCode)
1212
1213 _EvaluateJobProcessorResult(queue.depmgr, job,
1214 _JobProcessor(queue, wrap_execop_fn, job)())
1215
1216 @staticmethod
1217 def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
1218 """Updates the worker thread name to include a short summary of the opcode.
1219
1220 @param setname_fn: Callable setting worker thread name
1221 @param execop_fn: Callable for executing opcode (usually
1222 L{mcpu.Processor.ExecOpCode})
1223
1224 """
1225 setname_fn(op)
1226 try:
1227 return execop_fn(op, *args, **kwargs)
1228 finally:
1229 setname_fn(None)
1230
1231 @staticmethod
1232 def _GetWorkerName(job, op):
1233 """Sets the worker thread name.
1234
1235 @type job: L{_QueuedJob}
1236 @type op: L{opcodes.OpCode}
1237
1238 """
1239 parts = ["Job%s" % job.id]
1240
1241 if op:
1242 parts.append(op.TinySummary())
1243
1244 return "/".join(parts)
1245
1246
1247 class _JobQueueWorkerPool(workerpool.WorkerPool):
1248 """Simple class implementing a job-processing workerpool.
1249
1250 """
1251 def __init__(self, queue):
1252 super(_JobQueueWorkerPool, self).__init__("Jq",
1253 JOBQUEUE_THREADS,
1254 _JobQueueWorker)
1255 self.queue = queue
1256
1257
1258 class _JobDependencyManager:
1259 """Keeps track of job dependencies.
1260
1261 """
1262 (WAIT,
1263 ERROR,
1264 CANCEL,
1265 CONTINUE,
1266 WRONGSTATUS) = range(1, 6)
1267
1268 def __init__(self, getstatus_fn, enqueue_fn):
1269 """Initializes this class.
1270
1271 """
1272 self._getstatus_fn = getstatus_fn
1273 self._enqueue_fn = enqueue_fn
1274
1275 self._waiters = {}
1276 self._lock = locking.SharedLock("JobDepMgr")
1277
1278 @locking.ssynchronized(_LOCK, shared=1)
1279 def GetLockInfo(self, requested): # pylint: disable=W0613
1280 """Retrieves information about waiting jobs.
1281
1282 @type requested: set
1283 @param requested: Requested information, see C{query.LQ_*}
1284
1285 """
1286 # No need to sort here, that's being done by the lock manager and query
1287 # library. There are no priorities for notifying jobs, hence all show up as
1288 # one item under "pending".
1289 return [("job/%s" % job_id, None, None,
1290 [("job", [job.id for job in waiters])])
1291 for job_id, waiters in self._waiters.items()
1292 if waiters]
1293
1294 @locking.ssynchronized(_LOCK, shared=1)
1295 def JobWaiting(self, job):
1296 """Checks if a job is waiting.
1297
1298 """
1299 return compat.any(job in jobs
1300 for jobs in self._waiters.values())
1301
1302 @locking.ssynchronized(_LOCK)
1303 def CheckAndRegister(self, job, dep_job_id, dep_status):
1304 """Checks if a dependency job has the requested status.
1305
1306 If the other job is not yet in a finalized status, the calling job will be
1307 notified (re-added to the workerpool) at a later point.
1308
1309 @type job: L{_QueuedJob}
1310 @param job: Job object
1311 @type dep_job_id: int
1312 @param dep_job_id: ID of dependency job
1313 @type dep_status: list
1314 @param dep_status: Required status
1315
1316 """
1317 assert ht.TJobId(job.id)
1318 assert ht.TJobId(dep_job_id)
1319 assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1320
1321 if job.id == dep_job_id:
1322 return (self.ERROR, "Job can't depend on itself")
1323
1324 # Get status of dependency job
1325 try:
1326 status = self._getstatus_fn(dep_job_id)
1327 except errors.JobLost, err:
1328 return (self.ERROR, "Dependency error: %s" % err)
1329
1330 assert status in constants.JOB_STATUS_ALL
1331
1332 job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1333
1334 if status not in constants.JOBS_FINALIZED:
1335 # Register for notification and wait for job to finish
1336 job_id_waiters.add(job)
1337 return (self.WAIT,
1338 "Need to wait for job %s, wanted status '%s'" %
1339 (dep_job_id, dep_status))
1340
1341 # Remove from waiters list
1342 if job in job_id_waiters:
1343 job_id_waiters.remove(job)
1344
1345 if (status == constants.JOB_STATUS_CANCELED and
1346 constants.JOB_STATUS_CANCELED not in dep_status):
1347 return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1348
1349 elif not dep_status or status in dep_status:
1350 return (self.CONTINUE,
1351 "Dependency job %s finished with status '%s'" %
1352 (dep_job_id, status))
1353
1354 else:
1355 return (self.WRONGSTATUS,
1356 "Dependency job %s finished with status '%s',"
1357 " not one of '%s' as required" %
1358 (dep_job_id, status, utils.CommaJoin(dep_status)))
1359
1360 def _RemoveEmptyWaitersUnlocked(self):
1361 """Remove all jobs without actual waiters.
1362
1363 """
1364 for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1365 if not waiters]:
1366 del self._waiters[job_id]
1367
1368 def NotifyWaiters(self, job_id):
1369 """Notifies all jobs waiting for a certain job ID.
1370
1371 @attention: Do not call until L{CheckAndRegister} returned a status other
1372 than C{WAITDEP} for C{job_id}, or behaviour is undefined
1373 @type job_id: int
1374 @param job_id: Job ID
1375
1376 """
1377 assert ht.TJobId(job_id)
1378
1379 self._lock.acquire()
1380 try:
1381 self._RemoveEmptyWaitersUnlocked()
1382
1383 jobs = self._waiters.pop(job_id, None)
1384 finally:
1385 self._lock.release()
1386
1387 if jobs:
1388 # Re-add jobs to workerpool
1389 logging.debug("Re-adding %s jobs which were waiting for job %s",
1390 len(jobs), job_id)
1391 self._enqueue_fn(jobs)
1392
1393
1394 class JobQueue(object):
1395 """Queue used to manage the jobs.
1396
1397 """
1398 def __init__(self, context, cfg):
1399 """Constructor for JobQueue.
1400
1401 The constructor will initialize the job queue object and then
1402 start loading the current jobs from disk, either for starting them
1403 (if they were queue) or for aborting them (if they were already
1404 running).
1405
1406 @type context: GanetiContext
1407 @param context: the context object for access to the configuration
1408 data and other ganeti objects
1409
1410 """
1411 self.primary_jid = None
1412 self.context = context
1413 self._memcache = weakref.WeakValueDictionary()
1414 self._my_hostname = netutils.Hostname.GetSysName()
1415
1416 # The Big JobQueue lock. If a code block or method acquires it in shared
1417 # mode safe it must guarantee concurrency with all the code acquiring it in
1418 # shared mode, including itself. In order not to acquire it at all
1419 # concurrency must be guaranteed with all code acquiring it in shared mode
1420 # and all code acquiring it exclusively.
1421 self._lock = locking.SharedLock("JobQueue")
1422
1423 self.acquire = self._lock.acquire
1424 self.release = self._lock.release
1425
1426 # Read serial file
1427 self._last_serial = jstore.ReadSerial()
1428 assert self._last_serial is not None, ("Serial file was modified between"
1429 " check in jstore and here")
1430
1431 # Get initial list of nodes
1432 self._nodes = dict((n.name, n.primary_ip)
1433 for n in cfg.GetAllNodesInfo().values()
1434 if n.master_candidate)
1435
1436 # Remove master node
1437 self._nodes.pop(self._my_hostname, None)
1438
1439 # TODO: Check consistency across nodes
1440
1441 self._queue_size = None
1442 self._UpdateQueueSizeUnlocked()
1443 assert ht.TInt(self._queue_size)
1444
1445 # Job dependencies
1446 self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
1447 self._EnqueueJobs)
1448
1449 # Setup worker pool
1450 self._wpool = _JobQueueWorkerPool(self)
1451
1452 def _PickupJobUnlocked(self, job_id):
1453 """Load a job from the job queue
1454
1455 Pick up a job that already is in the job queue and start/resume it.
1456
1457 """
1458 if self.primary_jid:
1459 logging.warning("Job process asked to pick up %s, but already has %s",
1460 job_id, self.primary_jid)
1461
1462 self.primary_jid = int(job_id)
1463
1464 job = self._LoadJobUnlocked(job_id)
1465
1466 if job is None:
1467 logging.warning("Job %s could not be read", job_id)
1468 return
1469
1470 job.AddReasons(pickup=True)
1471
1472 status = job.CalcStatus()
1473 if status == constants.JOB_STATUS_QUEUED:
1474 job.SetPid(os.getpid())
1475 self._EnqueueJobsUnlocked([job])
1476 logging.info("Restarting job %s", job.id)
1477
1478 elif status in (constants.JOB_STATUS_RUNNING,
1479 constants.JOB_STATUS_WAITING,
1480 constants.JOB_STATUS_CANCELING):
1481 logging.warning("Unfinished job %s found: %s", job.id, job)
1482
1483 if status == constants.JOB_STATUS_WAITING:
1484 job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
1485 job.SetPid(os.getpid())
1486 self._EnqueueJobsUnlocked([job])
1487 logging.info("Restarting job %s", job.id)
1488 else:
1489 to_encode = errors.OpExecError("Unclean master daemon shutdown")
1490 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1491 _EncodeOpError(to_encode))
1492 job.Finalize()
1493
1494 self.UpdateJobUnlocked(job)
1495
1496 @locking.ssynchronized(_LOCK)
1497 def PickupJob(self, job_id):
1498 self._PickupJobUnlocked(job_id)
1499
1500 def _GetRpc(self, address_list):
1501 """Gets RPC runner with context.
1502
1503 """
1504 return rpc.JobQueueRunner(self.context, address_list)
1505
1506 @locking.ssynchronized(_LOCK)
1507 def AddNode(self, node):
1508 """Register a new node with the queue.
1509
1510 @type node: L{objects.Node}
1511 @param node: the node object to be added
1512
1513 """
1514 node_name = node.name
1515 assert node_name != self._my_hostname
1516
1517 # Clean queue directory on added node
1518 result = self._GetRpc(None).call_jobqueue_purge(node_name)
1519 msg = result.fail_msg
1520 if msg:
1521 logging.warning("Cannot cleanup queue directory on node %s: %s",
1522 node_name, msg)
1523
1524 if not node.master_candidate:
1525 # remove if existing, ignoring errors
1526 self._nodes.pop(node_name, None)
1527 # and skip the replication of the job ids
1528 return
1529
1530 # Upload the whole queue excluding archived jobs
1531 files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
1532
1533 # Upload current serial file
1534 files.append(pathutils.JOB_QUEUE_SERIAL_FILE)
1535
1536 # Static address list
1537 addrs = [node.primary_ip]
1538
1539 for file_name in files:
1540 # Read file content
1541 content = utils.ReadFile(file_name)
1542
1543 result = _CallJqUpdate(self._GetRpc(addrs), [node_name],
1544 file_name, content)
1545 msg = result[node_name].fail_msg
1546 if msg:
1547 logging.error("Failed to upload file %s to node %s: %s",
1548 file_name, node_name, msg)
1549
1550 msg = result[node_name].fail_msg
1551 if msg:
1552 logging.error("Failed to set queue drained flag on node %s: %s",
1553 node_name, msg)
1554
1555 self._nodes[node_name] = node.primary_ip
1556
1557 @locking.ssynchronized(_LOCK)
1558 def RemoveNode(self, node_name):
1559 """Callback called when removing nodes from the cluster.
1560
1561 @type node_name: str
1562 @param node_name: the name of the node to remove
1563
1564 """
1565 self._nodes.pop(node_name, None)
1566
1567 @staticmethod
1568 def _CheckRpcResult(result, nodes, failmsg):
1569 """Verifies the status of an RPC call.
1570
1571 Since we aim to keep consistency should this node (the current
1572 master) fail, we will log errors if our rpc fail, and especially
1573 log the case when more than half of the nodes fails.
1574
1575 @param result: the data as returned from the rpc call
1576 @type nodes: list
1577 @param nodes: the list of nodes we made the call to
1578 @type failmsg: str
1579 @param failmsg: the identifier to be used for logging
1580
1581 """
1582 failed = []
1583 success = []
1584
1585 for node in nodes:
1586 msg = result[node].fail_msg
1587 if msg:
1588 failed.append(node)
1589 logging.error("RPC call %s (%s) failed on node %s: %s",
1590 result[node].call, failmsg, node, msg)
1591 else:
1592 success.append(node)
1593
1594 # +1 for the master node
1595 if (len(success) + 1) < len(failed):
1596 # TODO: Handle failing nodes
1597 logging.error("More than half of the nodes failed")
1598
1599 def _GetNodeIp(self):
1600 """Helper for returning the node name/ip list.
1601
1602 @rtype: (list, list)
1603 @return: a tuple of two lists, the first one with the node
1604 names and the second one with the node addresses
1605
1606 """
1607 # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1608 name_list = self._nodes.keys()
1609 addr_list = [self._nodes[name] for name in name_list]
1610 return name_list, addr_list
1611
1612 def _UpdateJobQueueFile(self, file_name, data, replicate):
1613 """Writes a file locally and then replicates it to all nodes.
1614
1615 This function will replace the contents of a file on the local
1616 node and then replicate it to all the other nodes we have.
1617
1618 @type file_name: str
1619 @param file_name: the path of the file to be replicated
1620 @type data: str
1621 @param data: the new contents of the file
1622 @type replicate: boolean
1623 @param replicate: whether to spread the changes to the remote nodes
1624
1625 """
1626 getents = runtime.GetEnts()
1627 utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1628 gid=getents.daemons_gid,
1629 mode=constants.JOB_QUEUE_FILES_PERMS)
1630
1631 if replicate:
1632 names, addrs = self._GetNodeIp()
1633 result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data)
1634 self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1635
1636 def _RenameFilesUnlocked(self, rename):
1637 """Renames a file locally and then replicate the change.
1638
1639 This function will rename a file in the local queue directory
1640 and then replicate this rename to all the other nodes we have.
1641
1642 @type rename: list of (old, new)
1643 @param rename: List containing tuples mapping old to new names
1644
1645 """
1646 # Rename them locally
1647 for old, new in rename:
1648 utils.RenameFile(old, new, mkdir=True)
1649
1650 # ... and on all nodes
1651 names, addrs = self._GetNodeIp()
1652 result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1653 self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1654
1655 @staticmethod
1656 def _GetJobPath(job_id):
1657 """Returns the job file for a given job id.
1658
1659 @type job_id: str
1660 @param job_id: the job identifier
1661 @rtype: str
1662 @return: the path to the job file
1663
1664 """
1665 return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
1666
1667 @staticmethod
1668 def _GetArchivedJobPath(job_id):
1669 """Returns the archived job file for a give job id.
1670
1671 @type job_id: str
1672 @param job_id: the job identifier
1673 @rtype: str
1674 @return: the path to the archived job file
1675
1676 """
1677 return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR,
1678 jstore.GetArchiveDirectory(job_id),
1679 "job-%s" % job_id)
1680
1681 @staticmethod
1682 def _DetermineJobDirectories(archived):
1683 """Build list of directories containing job files.
1684
1685 @type archived: bool
1686 @param archived: Whether to include directories for archived jobs
1687 @rtype: list
1688
1689 """
1690 result = [pathutils.QUEUE_DIR]
1691
1692 if archived:
1693 archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR
1694 result.extend(map(compat.partial(utils.PathJoin, archive_path),
1695 utils.ListVisibleFiles(archive_path)))
1696
1697 return result
1698
1699 @classmethod
1700 def _GetJobIDsUnlocked(cls, sort=True, archived=False):
1701 """Return all known job IDs.
1702
1703 The method only looks at disk because it's a requirement that all
1704 jobs are present on disk (so in the _memcache we don't have any
1705 extra IDs).
1706
1707 @type sort: boolean
1708 @param sort: perform sorting on the returned job ids
1709 @rtype: list
1710 @return: the list of job IDs
1711
1712 """
1713 jlist = []
1714
1715 for path in cls._DetermineJobDirectories(archived):
1716 for filename in utils.ListVisibleFiles(path):
1717 m = constants.JOB_FILE_RE.match(filename)
1718 if m:
1719 jlist.append(int(m.group(1)))
1720
1721 if sort:
1722 jlist.sort()
1723 return jlist
1724
1725 def _LoadJobUnlocked(self, job_id):
1726 """Loads a job from the disk or memory.
1727
1728 Given a job id, this will return the cached job object if
1729 existing, or try to load the job from the disk. If loading from
1730 disk, it will also add the job to the cache.
1731
1732 @type job_id: int
1733 @param job_id: the job id
1734 @rtype: L{_QueuedJob} or None
1735 @return: either None or the job object
1736
1737 """
1738 assert isinstance(job_id, int), "Job queue: Supplied job id is not an int!"
1739
1740 job = self._memcache.get(job_id, None)
1741 if job:
1742 logging.debug("Found job %s in memcache", job_id)
1743 assert job.writable, "Found read-only job in memcache"
1744 return job
1745
1746 try:
1747 job = self._LoadJobFromDisk(job_id, False)
1748 if job is None:
1749 return job
1750 except errors.JobFileCorrupted:
1751 old_path = self._GetJobPath(job_id)
1752 new_path = self._GetArchivedJobPath(job_id)
1753 if old_path == new_path:
1754 # job already archived (future case)
1755 logging.exception("Can't parse job %s", job_id)
1756 else:
1757 # non-archived case
1758 logging.exception("Can't parse job %s, will archive.", job_id)
1759 self._RenameFilesUnlocked([(old_path, new_path)])
1760 return None
1761
1762 assert job.writable, "Job just loaded is not writable"
1763
1764 self._memcache[job_id] = job
1765 logging.debug("Added job %s to the cache", job_id)
1766 return job
1767
1768 def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
1769 """Load the given job file from disk.
1770
1771 Given a job file, read, load and restore it in a _QueuedJob format.
1772
1773 @type job_id: int
1774 @param job_id: job identifier
1775 @type try_archived: bool
1776 @param try_archived: Whether to try loading an archived job
1777 @rtype: L{_QueuedJob} or None
1778 @return: either None or the job object
1779
1780 """
1781 path_functions = [(self._GetJobPath, False)]
1782
1783 if try_archived:
1784 path_functions.append((self._GetArchivedJobPath, True))
1785
1786 raw_data = None
1787 archived = None
1788
1789 for (fn, archived) in path_functions:
1790 filepath = fn(job_id)
1791 logging.debug("Loading job from %s", filepath)
1792 try:
1793 raw_data = utils.ReadFile(filepath)
1794 except EnvironmentError, err:
1795 if err.errno != errno.ENOENT:
1796 raise
1797 else:
1798 break
1799
1800 if not raw_data:
1801 logging.debug("No data available for job %s", job_id)
1802 if int(job_id) == self.primary_jid:
1803 logging.warning("My own job file (%s) disappeared;"
1804 " this should only happy at cluster desctruction",
1805 job_id)
1806 if mcpu.lusExecuting[0] == 0:
1807 logging.warning("Not in execution; cleaning up myself due to missing"
1808 " job file")
1809 logging.shutdown()
1810 os._exit(1) # pylint: disable=W0212
1811 return None
1812
1813 if writable is None:
1814 writable = not archived
1815
1816 try:
1817 data = serializer.LoadJson(raw_data)
1818 job = _QueuedJob.Restore(self, data, writable, archived)
1819 except Exception, err: # pylint: disable=W0703
1820 raise errors.JobFileCorrupted(err)
1821
1822 return job
1823
1824 def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
1825 """Load the given job file from disk.
1826
1827 Given a job file, read, load and restore it in a _QueuedJob format.
1828 In case of error reading the job, it gets returned as None, and the
1829 exception is logged.
1830
1831 @type job_id: int
1832 @param job_id: job identifier
1833 @type try_archived: bool
1834 @param try_archived: Whether to try loading an archived job
1835 @rtype: L{_QueuedJob} or None
1836 @return: either None or the job object
1837
1838 """
1839 try:
1840 return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
1841 except (errors.JobFileCorrupted, EnvironmentError):
1842 logging.exception("Can't load/parse job %s", job_id)
1843 return None
1844
1845 def _UpdateQueueSizeUnlocked(self):
1846 """Update the queue size.
1847
1848 """
1849 self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1850
1851 @classmethod
1852 def SubmitManyJobs(cls, jobs):
1853 """Create and store multiple jobs.
1854
1855 """
1856 return luxi.Client(address=pathutils.QUERY_SOCKET).SubmitManyJobs(jobs)
1857
1858 @staticmethod
1859 def _FormatSubmitError(msg, ops):
1860 """Formats errors which occurred while submitting a job.
1861
1862 """
1863 return ("%s; opcodes %s" %
1864 (msg, utils.CommaJoin(op.Summary() for op in ops)))
1865
1866 @staticmethod
1867 def _ResolveJobDependencies(resolve_fn, deps):
1868 """Resolves relative job IDs in dependencies.
1869
1870 @type resolve_fn: callable
1871 @param resolve_fn: Function to resolve a relative job ID
1872 @type deps: list
1873 @param deps: Dependencies
1874 @rtype: tuple; (boolean, string or list)
1875 @return: If successful (first tuple item), the returned list contains
1876 resolved job IDs along with the requested status; if not successful,
1877 the second element is an error message
1878
1879 """
1880 result = []
1881
1882 for (dep_job_id, dep_status) in deps:
1883 if ht.TRelativeJobId(dep_job_id):
1884 assert ht.TInt(dep_job_id) and dep_job_id < 0
1885 try:
1886 job_id = resolve_fn(dep_job_id)
1887 except IndexError:
1888 # Abort
1889 return (False, "Unable to resolve relative job ID %s" % dep_job_id)
1890 else:
1891 job_id = dep_job_id
1892
1893 result.append((job_id, dep_status))
1894
1895 return (True, result)
1896
1897 @locking.ssynchronized(_LOCK)
1898 def _EnqueueJobs(self, jobs):
1899 """Helper function to add jobs to worker pool's queue.
1900
1901 @type jobs: list
1902 @param jobs: List of all jobs
1903
1904 """
1905 return self._EnqueueJobsUnlocked(jobs)
1906
1907 def _EnqueueJobsUnlocked(self, jobs):
1908 """Helper function to add jobs to worker pool's queue.
1909
1910 @type jobs: list
1911 @param jobs: List of all jobs
1912
1913 """
1914 assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
1915 self._wpool.AddManyTasks([(job, ) for job in jobs],
1916 priority=[job.CalcPriority() for job in jobs],
1917 task_id=map(_GetIdAttr, jobs))
1918
1919 def _GetJobStatusForDependencies(self, job_id):
1920 """Gets the status of a job for dependencies.
1921
1922 @type job_id: int
1923 @param job_id: Job ID
1924 @raise errors.JobLost: If job can't be found
1925
1926 """
1927 # Not using in-memory cache as doing so would require an exclusive lock
1928
1929 # Try to load from disk
1930 job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
1931
1932 if job:
1933 assert not job.writable, "Got writable job" # pylint: disable=E1101
1934
1935 if job:
1936 return job.CalcStatus()
1937
1938 raise errors.JobLost("Job %s not found" % job_id)
1939
1940 def UpdateJobUnlocked(self, job, replicate=True):
1941 """Update a job's on disk storage.
1942
1943 After a job has been modified, this function needs to be called in
1944 order to write the changes to disk and replicate them to the other
1945 nodes.
1946
1947 @type job: L{_QueuedJob}
1948 @param job: the changed job
1949 @type replicate: boolean
1950 @param replicate: whether to replicate the change to remote nodes
1951
1952 """
1953 if __debug__:
1954 finalized = job.CalcStatus() in constants.JOBS_FINALIZED
1955 assert (finalized ^ (job.end_timestamp is None))
1956 assert job.writable, "Can't update read-only job"
1957 assert not job.archived, "Can't update archived job"
1958
1959 filename = self._GetJobPath(job.id)
1960 data = serializer.DumpJson(job.Serialize())
1961 logging.debug("Writing job %s to %s", job.id, filename)
1962 self._UpdateJobQueueFile(filename, data, replicate)
1963
1964 def HasJobBeenFinalized(self, job_id):
1965 """Checks if a job has been finalized.
1966
1967 @type job_id: int
1968 @param job_id: Job identifier
1969 @rtype: boolean
1970 @return: True if the job has been finalized,
1971 False if the timeout has been reached,
1972 None if the job doesn't exist
1973
1974 """
1975 job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
1976 if job is not None:
1977 return job.CalcStatus() in constants.JOBS_FINALIZED
1978 elif cluster.LUClusterDestroy.clusterHasBeenDestroyed:
1979 # FIXME: The above variable is a temporary workaround until the Python job
1980 # queue is completely removed. When removing the job queue, also remove
1981 # the variable from LUClusterDestroy.
1982 return True
1983 else:
1984 return None
1985
1986 @locking.ssynchronized(_LOCK)
1987 def CancelJob(self, job_id):
1988 """Cancels a job.
1989
1990 This will only succeed if the job has not started yet.
1991
1992 @type job_id: int
1993 @param job_id: job ID of job to be cancelled.
1994
1995 """
1996 logging.info("Cancelling job %s", job_id)
1997
1998 return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
1999
2000 @locking.ssynchronized(_LOCK)
2001 def ChangeJobPriority(self, job_id, priority):
2002 """Changes a job's priority.
2003
2004 @type job_id: int
2005 @param job_id: ID of the job whose priority should be changed
2006 @type priority: int
2007 @param priority: New priority
2008
2009 """
2010 logging.info("Changing priority of job %s to %s", job_id, priority)
2011
2012 if priority not in constants.OP_PRIO_SUBMIT_VALID:
2013 allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2014 raise errors.GenericError("Invalid priority %s, allowed are %s" %
2015 (priority, allowed))
2016
2017 def fn(job):
2018 (success, msg) = job.ChangePriority(priority)
2019
2020 if success:
2021 try:
2022 self._wpool.ChangeTaskPriority(job.id, job.CalcPriority())
2023 except workerpool.NoSuchTask:
2024 logging.debug("Job %s is not in workerpool at this time", job.id)
2025
2026 return (success, msg)
2027
2028 return self._ModifyJobUnlocked(job_id, fn)
2029
2030 def _ModifyJobUnlocked(self, job_id, mod_fn):
2031 """Modifies a job.
2032
2033 @type job_id: int
2034 @param job_id: Job ID
2035 @type mod_fn: callable
2036 @param mod_fn: Modifying function, receiving job object as parameter,
2037 returning tuple of (status boolean, message string)
2038
2039 """
2040 job = self._LoadJobUnlocked(job_id)
2041 if not job:
2042 logging.debug("Job %s not found", job_id)
2043 return (False, "Job %s not found" % job_id)
2044
2045 assert job.writable, "Can't modify read-only job"
2046 assert not job.archived, "Can't modify archived job"
2047
2048 (success, msg) = mod_fn(job)
2049
2050 if success:
2051 # If the job was finalized (e.g. cancelled), this is the final write
2052 # allowed. The job can be archived anytime.
2053 self.UpdateJobUnlocked(job)
2054
2055 return (success, msg)
2056
2057 def _ArchiveJobsUnlocked(self, jobs):
2058 """Archives jobs.
2059
2060 @type jobs: list of L{_QueuedJob}
2061 @param jobs: Job objects
2062 @rtype: int
2063 @return: Number of archived jobs
2064
2065 """
2066 archive_jobs = []
2067 rename_files = []
2068 for job in jobs:
2069 assert job.writable, "Can't archive read-only job"
2070 assert not job.archived, "Can't cancel archived job"
2071
2072 if job.CalcStatus() not in constants.JOBS_FINALIZED:
2073 logging.debug("Job %s is not yet done", job.id)
2074 continue
2075
2076 archive_jobs.append(job)
2077
2078 old = self._GetJobPath(job.id)
2079 new = self._GetArchivedJobPath(job.id)
2080 rename_files.append((old, new))
2081
2082 # TODO: What if 1..n files fail to rename?
2083 self._RenameFilesUnlocked(rename_files)
2084
2085 logging.debug("Successfully archived job(s) %s",
2086 utils.CommaJoin(job.id for job in archive_jobs))
2087
2088 # Since we haven't quite checked, above, if we succeeded or failed renaming
2089 # the files, we update the cached queue size from the filesystem. When we
2090 # get around to fix the TODO: above, we can use the number of actually
2091 # archived jobs to fix this.
2092 self._UpdateQueueSizeUnlocked()
2093 return len(archive_jobs)
2094
2095 def _Query(self, fields, qfilter):
2096 qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
2097 namefield="id")
2098
2099 # Archived jobs are only looked at if the "archived" field is referenced
2100 # either as a requested field or in the filter. By default archived jobs
2101 # are ignored.
2102 include_archived = (query.JQ_ARCHIVED in qobj.RequestedData())
2103
2104 job_ids = qobj.RequestedNames()
2105
2106 list_all = (job_ids is None)
2107
2108 if list_all:
2109 # Since files are added to/removed from the queue atomically, there's no
2110 # risk of getting the job ids in an inconsistent state.
2111 job_ids = self._GetJobIDsUnlocked(archived=include_archived)
2112
2113 jobs = []
2114
2115 for job_id in job_ids:
2116 job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
2117 if job is not None or not list_all:
2118 jobs.append((job_id, job))
2119
2120 return (qobj, jobs, list_all)
2121
2122 def QueryJobs(self, fields, qfilter):
2123 """Returns a list of jobs in queue.
2124
2125 @type fields: sequence
2126 @param fields: List of wanted fields
2127 @type qfilter: None or query2 filter (list)
2128 @param qfilter: Query filter
2129
2130 """
2131 (qobj, ctx, _) = self._Query(fields, qfilter)
2132
2133 return query.GetQueryResponse(qobj, ctx, sort_by_name=False)
2134
2135 def OldStyleQueryJobs(self, job_ids, fields):
2136 """Returns a list of jobs in queue.
2137
2138 @type job_ids: list
2139 @param job_ids: sequence of job identifiers or None for all
2140 @type fields: list
2141 @param fields: names of fields to return
2142 @rtype: list
2143 @return: list one element per job, each element being list with
2144 the requested fields
2145
2146 """
2147 # backwards compat:
2148 job_ids = [int(jid) for jid in job_ids]
2149 qfilter = qlang.MakeSimpleFilter("id", job_ids)
2150
2151 (qobj, ctx, _) = self._Query(fields, qfilter)
2152
2153 return qobj.OldStyleQuery(ctx, sort_by_name=False)
2154
2155 @locking.ssynchronized(_LOCK)
2156 def PrepareShutdown(self):
2157 """Prepare to stop the job queue.
2158
2159 Returns whether there are any jobs currently running. If the latter is the
2160 case, the job queue is not yet ready for shutdown. Once this function
2161 returns C{True} L{Shutdown} can be called without interfering with any job.
2162
2163 @rtype: bool
2164 @return: Whether there are any running jobs
2165
2166 """
2167 return self._wpool.HasRunningTasks()
2168
2169 @locking.ssynchronized(_LOCK)
2170 def Shutdown(self):
2171 """Stops the job queue.
2172
2173 This shutdowns all the worker threads an closes the queue.
2174
2175 """
2176 self._wpool.TerminateWorkers()