Merge branch 'stable-2.15' into stable-2.16
[ganeti-github.git] / lib / jqueue / __init__.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2014 Google Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions are
9 # met:
10 #
11 # 1. Redistributions of source code must retain the above copyright notice,
12 # this list of conditions and the following disclaimer.
13 #
14 # 2. Redistributions in binary form must reproduce the above copyright
15 # notice, this list of conditions and the following disclaimer in the
16 # documentation and/or other materials provided with the distribution.
17 #
18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
19 # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
20 # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
22 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
23 # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
24 # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
25 # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
26 # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
27 # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30
31 """Module implementing the job queue handling.
32
33 """
34
35 import logging
36 import errno
37 import time
38 import weakref
39 import threading
40 import itertools
41 import operator
42 import os
43
44 try:
45 # pylint: disable=E0611
46 from pyinotify import pyinotify
47 except ImportError:
48 import pyinotify
49
50 from ganeti import asyncnotifier
51 from ganeti import constants
52 from ganeti import serializer
53 from ganeti import locking
54 from ganeti import luxi
55 from ganeti import opcodes
56 from ganeti import opcodes_base
57 from ganeti import errors
58 from ganeti import mcpu
59 from ganeti import utils
60 from ganeti import jstore
61 import ganeti.rpc.node as rpc
62 from ganeti import runtime
63 from ganeti import netutils
64 from ganeti import compat
65 from ganeti import ht
66 from ganeti import query
67 from ganeti import qlang
68 from ganeti import pathutils
69 from ganeti import vcluster
70 from ganeti.cmdlib import cluster
71
72
73 #: Retrieves "id" attribute
74 _GetIdAttr = operator.attrgetter("id")
75
76
77 class CancelJob(Exception):
78 """Special exception to cancel a job.
79
80 """
81
82
83 def TimeStampNow():
84 """Returns the current timestamp.
85
86 @rtype: tuple
87 @return: the current time in the (seconds, microseconds) format
88
89 """
90 return utils.SplitTime(time.time())
91
92
93 def _CallJqUpdate(runner, names, file_name, content):
94 """Updates job queue file after virtualizing filename.
95
96 """
97 virt_file_name = vcluster.MakeVirtualPath(file_name)
98 return runner.call_jobqueue_update(names, virt_file_name, content)
99
100
101 class _QueuedOpCode(object):
102 """Encapsulates an opcode object.
103
104 @ivar log: holds the execution log and consists of tuples
105 of the form C{(log_serial, timestamp, level, message)}
106 @ivar input: the OpCode we encapsulate
107 @ivar status: the current status
108 @ivar result: the result of the LU execution
109 @ivar start_timestamp: timestamp for the start of the execution
110 @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
111 @ivar stop_timestamp: timestamp for the end of the execution
112
113 """
114 __slots__ = ["input", "status", "result", "log", "priority",
115 "start_timestamp", "exec_timestamp", "end_timestamp",
116 "__weakref__"]
117
118 def __init__(self, op):
119 """Initializes instances of this class.
120
121 @type op: L{opcodes.OpCode}
122 @param op: the opcode we encapsulate
123
124 """
125 self.input = op
126 self.status = constants.OP_STATUS_QUEUED
127 self.result = None
128 self.log = []
129 self.start_timestamp = None
130 self.exec_timestamp = None
131 self.end_timestamp = None
132
133 # Get initial priority (it might change during the lifetime of this opcode)
134 self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
135
136 @classmethod
137 def Restore(cls, state):
138 """Restore the _QueuedOpCode from the serialized form.
139
140 @type state: dict
141 @param state: the serialized state
142 @rtype: _QueuedOpCode
143 @return: a new _QueuedOpCode instance
144
145 """
146 obj = _QueuedOpCode.__new__(cls)
147 obj.input = opcodes.OpCode.LoadOpCode(state["input"])
148 obj.status = state["status"]
149 obj.result = state["result"]
150 obj.log = state["log"]
151 obj.start_timestamp = state.get("start_timestamp", None)
152 obj.exec_timestamp = state.get("exec_timestamp", None)
153 obj.end_timestamp = state.get("end_timestamp", None)
154 obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
155 return obj
156
157 def Serialize(self):
158 """Serializes this _QueuedOpCode.
159
160 @rtype: dict
161 @return: the dictionary holding the serialized state
162
163 """
164 return {
165 "input": self.input.__getstate__(),
166 "status": self.status,
167 "result": self.result,
168 "log": self.log,
169 "start_timestamp": self.start_timestamp,
170 "exec_timestamp": self.exec_timestamp,
171 "end_timestamp": self.end_timestamp,
172 "priority": self.priority,
173 }
174
175
176 class _QueuedJob(object):
177 """In-memory job representation.
178
179 This is what we use to track the user-submitted jobs. Locking must
180 be taken care of by users of this class.
181
182 @type queue: L{JobQueue}
183 @ivar queue: the parent queue
184 @ivar id: the job ID
185 @type ops: list
186 @ivar ops: the list of _QueuedOpCode that constitute the job
187 @type log_serial: int
188 @ivar log_serial: holds the index for the next log entry
189 @ivar received_timestamp: the timestamp for when the job was received
190 @ivar start_timestmap: the timestamp for start of execution
191 @ivar end_timestamp: the timestamp for end of execution
192 @ivar writable: Whether the job is allowed to be modified
193
194 """
195 # pylint: disable=W0212
196 __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
197 "received_timestamp", "start_timestamp", "end_timestamp",
198 "writable", "archived",
199 "livelock", "process_id",
200 "__weakref__"]
201
202 def AddReasons(self, pickup=False):
203 """Extend the reason trail
204
205 Add the reason for all the opcodes of this job to be executed.
206
207 """
208 count = 0
209 for queued_op in self.ops:
210 op = queued_op.input
211 if pickup:
212 reason_src_prefix = constants.OPCODE_REASON_SRC_PICKUP
213 else:
214 reason_src_prefix = constants.OPCODE_REASON_SRC_OPCODE
215 reason_src = opcodes_base.NameToReasonSrc(op.__class__.__name__,
216 reason_src_prefix)
217 reason_text = "job=%d;index=%d" % (self.id, count)
218 reason = getattr(op, "reason", [])
219 reason.append((reason_src, reason_text, utils.EpochNano()))
220 op.reason = reason
221 count = count + 1
222
223 def __init__(self, queue, job_id, ops, writable):
224 """Constructor for the _QueuedJob.
225
226 @type queue: L{JobQueue}
227 @param queue: our parent queue
228 @type job_id: job_id
229 @param job_id: our job id
230 @type ops: list
231 @param ops: the list of opcodes we hold, which will be encapsulated
232 in _QueuedOpCodes
233 @type writable: bool
234 @param writable: Whether job can be modified
235
236 """
237 if not ops:
238 raise errors.GenericError("A job needs at least one opcode")
239
240 self.queue = queue
241 self.id = int(job_id)
242 self.ops = [_QueuedOpCode(op) for op in ops]
243 self.AddReasons()
244 self.log_serial = 0
245 self.received_timestamp = TimeStampNow()
246 self.start_timestamp = None
247 self.end_timestamp = None
248 self.archived = False
249 self.livelock = None
250 self.process_id = None
251
252 self._InitInMemory(self, writable)
253
254 assert not self.archived, "New jobs can not be marked as archived"
255
256 @staticmethod
257 def _InitInMemory(obj, writable):
258 """Initializes in-memory variables.
259
260 """
261 obj.writable = writable
262 obj.ops_iter = None
263 obj.cur_opctx = None
264
265 def __repr__(self):
266 status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
267 "id=%s" % self.id,
268 "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
269
270 return "<%s at %#x>" % (" ".join(status), id(self))
271
272 @classmethod
273 def Restore(cls, queue, state, writable, archived):
274 """Restore a _QueuedJob from serialized state:
275
276 @type queue: L{JobQueue}
277 @param queue: to which queue the restored job belongs
278 @type state: dict
279 @param state: the serialized state
280 @type writable: bool
281 @param writable: Whether job can be modified
282 @type archived: bool
283 @param archived: Whether job was already archived
284 @rtype: _JobQueue
285 @return: the restored _JobQueue instance
286
287 """
288 obj = _QueuedJob.__new__(cls)
289 obj.queue = queue
290 obj.id = int(state["id"])
291 obj.received_timestamp = state.get("received_timestamp", None)
292 obj.start_timestamp = state.get("start_timestamp", None)
293 obj.end_timestamp = state.get("end_timestamp", None)
294 obj.archived = archived
295 obj.livelock = state.get("livelock", None)
296 obj.process_id = state.get("process_id", None)
297 if obj.process_id is not None:
298 obj.process_id = int(obj.process_id)
299
300 obj.ops = []
301 obj.log_serial = 0
302 for op_state in state["ops"]:
303 op = _QueuedOpCode.Restore(op_state)
304 for log_entry in op.log:
305 obj.log_serial = max(obj.log_serial, log_entry[0])
306 obj.ops.append(op)
307
308 cls._InitInMemory(obj, writable)
309
310 return obj
311
312 def Serialize(self):
313 """Serialize the _JobQueue instance.
314
315 @rtype: dict
316 @return: the serialized state
317
318 """
319 return {
320 "id": self.id,
321 "ops": [op.Serialize() for op in self.ops],
322 "start_timestamp": self.start_timestamp,
323 "end_timestamp": self.end_timestamp,
324 "received_timestamp": self.received_timestamp,
325 "livelock": self.livelock,
326 "process_id": self.process_id,
327 }
328
329 def CalcStatus(self):
330 """Compute the status of this job.
331
332 This function iterates over all the _QueuedOpCodes in the job and
333 based on their status, computes the job status.
334
335 The algorithm is:
336 - if we find a cancelled, or finished with error, the job
337 status will be the same
338 - otherwise, the last opcode with the status one of:
339 - waitlock
340 - canceling
341 - running
342
343 will determine the job status
344
345 - otherwise, it means either all opcodes are queued, or success,
346 and the job status will be the same
347
348 @return: the job status
349
350 """
351 status = constants.JOB_STATUS_QUEUED
352
353 all_success = True
354 for op in self.ops:
355 if op.status == constants.OP_STATUS_SUCCESS:
356 continue
357
358 all_success = False
359
360 if op.status == constants.OP_STATUS_QUEUED:
361 pass
362 elif op.status == constants.OP_STATUS_WAITING:
363 status = constants.JOB_STATUS_WAITING
364 elif op.status == constants.OP_STATUS_RUNNING:
365 status = constants.JOB_STATUS_RUNNING
366 elif op.status == constants.OP_STATUS_CANCELING:
367 status = constants.JOB_STATUS_CANCELING
368 break
369 elif op.status == constants.OP_STATUS_ERROR:
370 status = constants.JOB_STATUS_ERROR
371 # The whole job fails if one opcode failed
372 break
373 elif op.status == constants.OP_STATUS_CANCELED:
374 status = constants.OP_STATUS_CANCELED
375 break
376
377 if all_success:
378 status = constants.JOB_STATUS_SUCCESS
379
380 return status
381
382 def CalcPriority(self):
383 """Gets the current priority for this job.
384
385 Only unfinished opcodes are considered. When all are done, the default
386 priority is used.
387
388 @rtype: int
389
390 """
391 priorities = [op.priority for op in self.ops
392 if op.status not in constants.OPS_FINALIZED]
393
394 if not priorities:
395 # All opcodes are done, assume default priority
396 return constants.OP_PRIO_DEFAULT
397
398 return min(priorities)
399
400 def GetLogEntries(self, newer_than):
401 """Selectively returns the log entries.
402
403 @type newer_than: None or int
404 @param newer_than: if this is None, return all log entries,
405 otherwise return only the log entries with serial higher
406 than this value
407 @rtype: list
408 @return: the list of the log entries selected
409
410 """
411 if newer_than is None:
412 serial = -1
413 else:
414 serial = newer_than
415
416 entries = []
417 for op in self.ops:
418 entries.extend(filter(lambda entry: entry[0] > serial, op.log))
419
420 return entries
421
422 def MarkUnfinishedOps(self, status, result):
423 """Mark unfinished opcodes with a given status and result.
424
425 This is an utility function for marking all running or waiting to
426 be run opcodes with a given status. Opcodes which are already
427 finalised are not changed.
428
429 @param status: a given opcode status
430 @param result: the opcode result
431
432 """
433 not_marked = True
434 for op in self.ops:
435 if op.status in constants.OPS_FINALIZED:
436 assert not_marked, "Finalized opcodes found after non-finalized ones"
437 continue
438 op.status = status
439 op.result = result
440 not_marked = False
441
442 def Finalize(self):
443 """Marks the job as finalized.
444
445 """
446 self.end_timestamp = TimeStampNow()
447
448 def Cancel(self):
449 """Marks job as canceled/-ing if possible.
450
451 @rtype: tuple; (bool, string)
452 @return: Boolean describing whether job was successfully canceled or marked
453 as canceling and a text message
454
455 """
456 status = self.CalcStatus()
457
458 if status == constants.JOB_STATUS_QUEUED:
459 self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
460 "Job canceled by request")
461 self.Finalize()
462 return (True, "Job %s canceled" % self.id)
463
464 elif status == constants.JOB_STATUS_WAITING:
465 # The worker will notice the new status and cancel the job
466 self.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
467 return (True, "Job %s will be canceled" % self.id)
468
469 else:
470 logging.debug("Job %s is no longer waiting in the queue", self.id)
471 return (False, "Job %s is no longer waiting in the queue" % self.id)
472
473 def ChangePriority(self, priority):
474 """Changes the job priority.
475
476 @type priority: int
477 @param priority: New priority
478 @rtype: tuple; (bool, string)
479 @return: Boolean describing whether job's priority was successfully changed
480 and a text message
481
482 """
483 status = self.CalcStatus()
484
485 if status in constants.JOBS_FINALIZED:
486 return (False, "Job %s is finished" % self.id)
487 elif status == constants.JOB_STATUS_CANCELING:
488 return (False, "Job %s is cancelling" % self.id)
489 else:
490 assert status in (constants.JOB_STATUS_QUEUED,
491 constants.JOB_STATUS_WAITING,
492 constants.JOB_STATUS_RUNNING)
493
494 changed = False
495 for op in self.ops:
496 if (op.status == constants.OP_STATUS_RUNNING or
497 op.status in constants.OPS_FINALIZED):
498 assert not changed, \
499 ("Found opcode for which priority should not be changed after"
500 " priority has been changed for previous opcodes")
501 continue
502
503 assert op.status in (constants.OP_STATUS_QUEUED,
504 constants.OP_STATUS_WAITING)
505
506 changed = True
507
508 # Set new priority (doesn't modify opcode input)
509 op.priority = priority
510
511 if changed:
512 return (True, ("Priorities of pending opcodes for job %s have been"
513 " changed to %s" % (self.id, priority)))
514 else:
515 return (False, "Job %s had no pending opcodes" % self.id)
516
517 def SetPid(self, pid):
518 """Sets the job's process ID
519
520 @type pid: int
521 @param pid: the process ID
522
523 """
524 status = self.CalcStatus()
525
526 if status in (constants.JOB_STATUS_QUEUED,
527 constants.JOB_STATUS_WAITING):
528 if self.process_id is not None:
529 logging.warning("Replacing the process id %s of job %s with %s",
530 self.process_id, self.id, pid)
531 self.process_id = pid
532 else:
533 logging.warning("Can set pid only for queued/waiting jobs")
534
535
536 class _OpExecCallbacks(mcpu.OpExecCbBase):
537
538 def __init__(self, queue, job, op):
539 """Initializes this class.
540
541 @type queue: L{JobQueue}
542 @param queue: Job queue
543 @type job: L{_QueuedJob}
544 @param job: Job object
545 @type op: L{_QueuedOpCode}
546 @param op: OpCode
547
548 """
549 super(_OpExecCallbacks, self).__init__()
550
551 assert queue, "Queue is missing"
552 assert job, "Job is missing"
553 assert op, "Opcode is missing"
554
555 self._queue = queue
556 self._job = job
557 self._op = op
558
559 def _CheckCancel(self):
560 """Raises an exception to cancel the job if asked to.
561
562 """
563 # Cancel here if we were asked to
564 if self._op.status == constants.OP_STATUS_CANCELING:
565 logging.debug("Canceling opcode")
566 raise CancelJob()
567
568 def NotifyStart(self):
569 """Mark the opcode as running, not lock-waiting.
570
571 This is called from the mcpu code as a notifier function, when the LU is
572 finally about to start the Exec() method. Of course, to have end-user
573 visible results, the opcode must be initially (before calling into
574 Processor.ExecOpCode) set to OP_STATUS_WAITING.
575
576 """
577 assert self._op in self._job.ops
578 assert self._op.status in (constants.OP_STATUS_WAITING,
579 constants.OP_STATUS_CANCELING)
580
581 # Cancel here if we were asked to
582 self._CheckCancel()
583
584 logging.debug("Opcode is now running")
585
586 self._op.status = constants.OP_STATUS_RUNNING
587 self._op.exec_timestamp = TimeStampNow()
588
589 # And finally replicate the job status
590 self._queue.UpdateJobUnlocked(self._job)
591
592 def NotifyRetry(self):
593 """Mark opcode again as lock-waiting.
594
595 This is called from the mcpu code just after calling PrepareRetry.
596 The opcode will now again acquire locks (more, hopefully).
597
598 """
599 self._op.status = constants.OP_STATUS_WAITING
600 logging.debug("Opcode will be retried. Back to waiting.")
601
602 def _AppendFeedback(self, timestamp, log_type, log_msg):
603 """Internal feedback append function, with locks
604
605 """
606 self._job.log_serial += 1
607 self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
608 self._queue.UpdateJobUnlocked(self._job, replicate=False)
609
610 def Feedback(self, *args):
611 """Append a log entry.
612
613 """
614 assert len(args) < 3
615
616 if len(args) == 1:
617 log_type = constants.ELOG_MESSAGE
618 log_msg = args[0]
619 else:
620 (log_type, log_msg) = args
621
622 # The time is split to make serialization easier and not lose
623 # precision.
624 timestamp = utils.SplitTime(time.time())
625 self._AppendFeedback(timestamp, log_type, log_msg)
626
627 def CurrentPriority(self):
628 """Returns current priority for opcode.
629
630 """
631 assert self._op.status in (constants.OP_STATUS_WAITING,
632 constants.OP_STATUS_CANCELING)
633
634 # Cancel here if we were asked to
635 self._CheckCancel()
636
637 return self._op.priority
638
639 def SubmitManyJobs(self, jobs):
640 """Submits jobs for processing.
641
642 See L{JobQueue.SubmitManyJobs}.
643
644 """
645 # Locking is done in job queue
646 return self._queue.SubmitManyJobs(jobs)
647
648
649 def _EncodeOpError(err):
650 """Encodes an error which occurred while processing an opcode.
651
652 """
653 if isinstance(err, errors.GenericError):
654 to_encode = err
655 else:
656 to_encode = errors.OpExecError(str(err))
657
658 return errors.EncodeException(to_encode)
659
660
661 class _TimeoutStrategyWrapper:
662 def __init__(self, fn):
663 """Initializes this class.
664
665 """
666 self._fn = fn
667 self._next = None
668
669 def _Advance(self):
670 """Gets the next timeout if necessary.
671
672 """
673 if self._next is None:
674 self._next = self._fn()
675
676 def Peek(self):
677 """Returns the next timeout.
678
679 """
680 self._Advance()
681 return self._next
682
683 def Next(self):
684 """Returns the current timeout and advances the internal state.
685
686 """
687 self._Advance()
688 result = self._next
689 self._next = None
690 return result
691
692
693 class _OpExecContext:
694 def __init__(self, op, index, log_prefix, timeout_strategy_factory):
695 """Initializes this class.
696
697 """
698 self.op = op
699 self.index = index
700 self.log_prefix = log_prefix
701 self.summary = op.input.Summary()
702
703 # Create local copy to modify
704 if getattr(op.input, opcodes_base.DEPEND_ATTR, None):
705 self.jobdeps = op.input.depends[:]
706 else:
707 self.jobdeps = None
708
709 self._timeout_strategy_factory = timeout_strategy_factory
710 self._ResetTimeoutStrategy()
711
712 def _ResetTimeoutStrategy(self):
713 """Creates a new timeout strategy.
714
715 """
716 self._timeout_strategy = \
717 _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
718
719 def CheckPriorityIncrease(self):
720 """Checks whether priority can and should be increased.
721
722 Called when locks couldn't be acquired.
723
724 """
725 op = self.op
726
727 # Exhausted all retries and next round should not use blocking acquire
728 # for locks?
729 if (self._timeout_strategy.Peek() is None and
730 op.priority > constants.OP_PRIO_HIGHEST):
731 logging.debug("Increasing priority")
732 op.priority -= 1
733 self._ResetTimeoutStrategy()
734 return True
735
736 return False
737
738 def GetNextLockTimeout(self):
739 """Returns the next lock acquire timeout.
740
741 """
742 return self._timeout_strategy.Next()
743
744
745 class _JobProcessor(object):
746 (DEFER,
747 WAITDEP,
748 FINISHED) = range(1, 4)
749
750 def __init__(self, queue, opexec_fn, job,
751 _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
752 """Initializes this class.
753
754 """
755 self.queue = queue
756 self.opexec_fn = opexec_fn
757 self.job = job
758 self._timeout_strategy_factory = _timeout_strategy_factory
759
760 @staticmethod
761 def _FindNextOpcode(job, timeout_strategy_factory):
762 """Locates the next opcode to run.
763
764 @type job: L{_QueuedJob}
765 @param job: Job object
766 @param timeout_strategy_factory: Callable to create new timeout strategy
767
768 """
769 # Create some sort of a cache to speed up locating next opcode for future
770 # lookups
771 # TODO: Consider splitting _QueuedJob.ops into two separate lists, one for
772 # pending and one for processed ops.
773 if job.ops_iter is None:
774 job.ops_iter = enumerate(job.ops)
775
776 # Find next opcode to run
777 while True:
778 try:
779 (idx, op) = job.ops_iter.next()
780 except StopIteration:
781 raise errors.ProgrammerError("Called for a finished job")
782
783 if op.status == constants.OP_STATUS_RUNNING:
784 # Found an opcode already marked as running
785 raise errors.ProgrammerError("Called for job marked as running")
786
787 opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
788 timeout_strategy_factory)
789
790 if op.status not in constants.OPS_FINALIZED:
791 return opctx
792
793 # This is a job that was partially completed before master daemon
794 # shutdown, so it can be expected that some opcodes are already
795 # completed successfully (if any did error out, then the whole job
796 # should have been aborted and not resubmitted for processing).
797 logging.info("%s: opcode %s already processed, skipping",
798 opctx.log_prefix, opctx.summary)
799
800 @staticmethod
801 def _MarkWaitlock(job, op):
802 """Marks an opcode as waiting for locks.
803
804 The job's start timestamp is also set if necessary.
805
806 @type job: L{_QueuedJob}
807 @param job: Job object
808 @type op: L{_QueuedOpCode}
809 @param op: Opcode object
810
811 """
812 assert op in job.ops
813 assert op.status in (constants.OP_STATUS_QUEUED,
814 constants.OP_STATUS_WAITING)
815
816 update = False
817
818 op.result = None
819
820 if op.status == constants.OP_STATUS_QUEUED:
821 op.status = constants.OP_STATUS_WAITING
822 update = True
823
824 if op.start_timestamp is None:
825 op.start_timestamp = TimeStampNow()
826 update = True
827
828 if job.start_timestamp is None:
829 job.start_timestamp = op.start_timestamp
830 update = True
831
832 assert op.status == constants.OP_STATUS_WAITING
833
834 return update
835
836 @staticmethod
837 def _CheckDependencies(queue, job, opctx):
838 """Checks if an opcode has dependencies and if so, processes them.
839
840 @type queue: L{JobQueue}
841 @param queue: Queue object
842 @type job: L{_QueuedJob}
843 @param job: Job object
844 @type opctx: L{_OpExecContext}
845 @param opctx: Opcode execution context
846 @rtype: bool
847 @return: Whether opcode will be re-scheduled by dependency tracker
848
849 """
850 op = opctx.op
851
852 result = False
853
854 while opctx.jobdeps:
855 (dep_job_id, dep_status) = opctx.jobdeps[0]
856
857 (depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
858 dep_status)
859 assert ht.TNonEmptyString(depmsg), "No dependency message"
860
861 logging.info("%s: %s", opctx.log_prefix, depmsg)
862
863 if depresult == _JobDependencyManager.CONTINUE:
864 # Remove dependency and continue
865 opctx.jobdeps.pop(0)
866
867 elif depresult == _JobDependencyManager.WAIT:
868 # Need to wait for notification, dependency tracker will re-add job
869 # to workerpool
870 result = True
871 break
872
873 elif depresult == _JobDependencyManager.CANCEL:
874 # Job was cancelled, cancel this job as well
875 job.Cancel()
876 assert op.status == constants.OP_STATUS_CANCELING
877 break
878
879 elif depresult in (_JobDependencyManager.WRONGSTATUS,
880 _JobDependencyManager.ERROR):
881 # Job failed or there was an error, this job must fail
882 op.status = constants.OP_STATUS_ERROR
883 op.result = _EncodeOpError(errors.OpExecError(depmsg))
884 break
885
886 else:
887 raise errors.ProgrammerError("Unknown dependency result '%s'" %
888 depresult)
889
890 return result
891
892 def _ExecOpCodeUnlocked(self, opctx):
893 """Processes one opcode and returns the result.
894
895 """
896 op = opctx.op
897
898 assert op.status in (constants.OP_STATUS_WAITING,
899 constants.OP_STATUS_CANCELING)
900
901 # The very last check if the job was cancelled before trying to execute
902 if op.status == constants.OP_STATUS_CANCELING:
903 return (constants.OP_STATUS_CANCELING, None)
904
905 timeout = opctx.GetNextLockTimeout()
906
907 try:
908 # Make sure not to hold queue lock while calling ExecOpCode
909 result = self.opexec_fn(op.input,
910 _OpExecCallbacks(self.queue, self.job, op),
911 timeout=timeout)
912 except mcpu.LockAcquireTimeout:
913 assert timeout is not None, "Received timeout for blocking acquire"
914 logging.debug("Couldn't acquire locks in %0.6fs", timeout)
915
916 assert op.status in (constants.OP_STATUS_WAITING,
917 constants.OP_STATUS_CANCELING)
918
919 # Was job cancelled while we were waiting for the lock?
920 if op.status == constants.OP_STATUS_CANCELING:
921 return (constants.OP_STATUS_CANCELING, None)
922
923 # Stay in waitlock while trying to re-acquire lock
924 return (constants.OP_STATUS_WAITING, None)
925 except CancelJob:
926 logging.exception("%s: Canceling job", opctx.log_prefix)
927 assert op.status == constants.OP_STATUS_CANCELING
928 return (constants.OP_STATUS_CANCELING, None)
929
930 except Exception, err: # pylint: disable=W0703
931 logging.exception("%s: Caught exception in %s",
932 opctx.log_prefix, opctx.summary)
933 return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
934 else:
935 logging.debug("%s: %s successful",
936 opctx.log_prefix, opctx.summary)
937 return (constants.OP_STATUS_SUCCESS, result)
938
939 def __call__(self, _nextop_fn=None):
940 """Continues execution of a job.
941
942 @param _nextop_fn: Callback function for tests
943 @return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
944 be deferred and C{WAITDEP} if the dependency manager
945 (L{_JobDependencyManager}) will re-schedule the job when appropriate
946
947 """
948 queue = self.queue
949 job = self.job
950
951 logging.debug("Processing job %s", job.id)
952
953 try:
954 opcount = len(job.ops)
955
956 assert job.writable, "Expected writable job"
957
958 # Don't do anything for finalized jobs
959 if job.CalcStatus() in constants.JOBS_FINALIZED:
960 return self.FINISHED
961
962 # Is a previous opcode still pending?
963 if job.cur_opctx:
964 opctx = job.cur_opctx
965 job.cur_opctx = None
966 else:
967 if __debug__ and _nextop_fn:
968 _nextop_fn()
969 opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
970
971 op = opctx.op
972
973 # Consistency check
974 assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
975 constants.OP_STATUS_CANCELING)
976 for i in job.ops[opctx.index + 1:])
977
978 assert op.status in (constants.OP_STATUS_QUEUED,
979 constants.OP_STATUS_WAITING,
980 constants.OP_STATUS_CANCELING)
981
982 assert (op.priority <= constants.OP_PRIO_LOWEST and
983 op.priority >= constants.OP_PRIO_HIGHEST)
984
985 waitjob = None
986
987 if op.status != constants.OP_STATUS_CANCELING:
988 assert op.status in (constants.OP_STATUS_QUEUED,
989 constants.OP_STATUS_WAITING)
990
991 # Prepare to start opcode
992 if self._MarkWaitlock(job, op):
993 # Write to disk
994 queue.UpdateJobUnlocked(job)
995
996 assert op.status == constants.OP_STATUS_WAITING
997 assert job.CalcStatus() == constants.JOB_STATUS_WAITING
998 assert job.start_timestamp and op.start_timestamp
999 assert waitjob is None
1000
1001 # Check if waiting for a job is necessary
1002 waitjob = self._CheckDependencies(queue, job, opctx)
1003
1004 assert op.status in (constants.OP_STATUS_WAITING,
1005 constants.OP_STATUS_CANCELING,
1006 constants.OP_STATUS_ERROR)
1007
1008 if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
1009 constants.OP_STATUS_ERROR)):
1010 logging.info("%s: opcode %s waiting for locks",
1011 opctx.log_prefix, opctx.summary)
1012
1013 assert not opctx.jobdeps, "Not all dependencies were removed"
1014
1015 (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
1016
1017 op.status = op_status
1018 op.result = op_result
1019
1020 assert not waitjob
1021
1022 if op.status in (constants.OP_STATUS_WAITING,
1023 constants.OP_STATUS_QUEUED):
1024 # waiting: Couldn't get locks in time
1025 # queued: Queue is shutting down
1026 assert not op.end_timestamp
1027 else:
1028 # Finalize opcode
1029 op.end_timestamp = TimeStampNow()
1030
1031 if op.status == constants.OP_STATUS_CANCELING:
1032 assert not compat.any(i.status != constants.OP_STATUS_CANCELING
1033 for i in job.ops[opctx.index:])
1034 else:
1035 assert op.status in constants.OPS_FINALIZED
1036
1037 if op.status == constants.OP_STATUS_QUEUED:
1038 # Queue is shutting down
1039 assert not waitjob
1040
1041 finalize = False
1042
1043 # Reset context
1044 job.cur_opctx = None
1045
1046 # In no case must the status be finalized here
1047 assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
1048
1049 elif op.status == constants.OP_STATUS_WAITING or waitjob:
1050 finalize = False
1051
1052 if not waitjob and opctx.CheckPriorityIncrease():
1053 # Priority was changed, need to update on-disk file
1054 queue.UpdateJobUnlocked(job)
1055
1056 # Keep around for another round
1057 job.cur_opctx = opctx
1058
1059 assert (op.priority <= constants.OP_PRIO_LOWEST and
1060 op.priority >= constants.OP_PRIO_HIGHEST)
1061
1062 # In no case must the status be finalized here
1063 assert job.CalcStatus() == constants.JOB_STATUS_WAITING
1064
1065 else:
1066 # Ensure all opcodes so far have been successful
1067 assert (opctx.index == 0 or
1068 compat.all(i.status == constants.OP_STATUS_SUCCESS
1069 for i in job.ops[:opctx.index]))
1070
1071 # Reset context
1072 job.cur_opctx = None
1073
1074 if op.status == constants.OP_STATUS_SUCCESS:
1075 finalize = False
1076
1077 elif op.status == constants.OP_STATUS_ERROR:
1078 # If we get here, we cannot afford to check for any consistency
1079 # any more, we just want to clean up.
1080 # TODO: Actually, it wouldn't be a bad idea to start a timer
1081 # here to kill the whole process.
1082 to_encode = errors.OpExecError("Preceding opcode failed")
1083 job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
1084 _EncodeOpError(to_encode))
1085 finalize = True
1086 elif op.status == constants.OP_STATUS_CANCELING:
1087 job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1088 "Job canceled by request")
1089 finalize = True
1090
1091 else:
1092 raise errors.ProgrammerError("Unknown status '%s'" % op.status)
1093
1094 if opctx.index == (opcount - 1):
1095 # Finalize on last opcode
1096 finalize = True
1097
1098 if finalize:
1099 # All opcodes have been run, finalize job
1100 job.Finalize()
1101
1102 # Write to disk. If the job status is final, this is the final write
1103 # allowed. Once the file has been written, it can be archived anytime.
1104 queue.UpdateJobUnlocked(job)
1105
1106 assert not waitjob
1107
1108 if finalize:
1109 logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
1110 return self.FINISHED
1111
1112 assert not waitjob or queue.depmgr.JobWaiting(job)
1113
1114 if waitjob:
1115 return self.WAITDEP
1116 else:
1117 return self.DEFER
1118 finally:
1119 assert job.writable, "Job became read-only while being processed"
1120
1121
1122 class _JobDependencyManager:
1123 """Keeps track of job dependencies.
1124
1125 """
1126 (WAIT,
1127 ERROR,
1128 CANCEL,
1129 CONTINUE,
1130 WRONGSTATUS) = range(1, 6)
1131
1132 def __init__(self, getstatus_fn):
1133 """Initializes this class.
1134
1135 """
1136 self._getstatus_fn = getstatus_fn
1137
1138 self._waiters = {}
1139
1140 def JobWaiting(self, job):
1141 """Checks if a job is waiting.
1142
1143 """
1144 return compat.any(job in jobs
1145 for jobs in self._waiters.values())
1146
1147 def CheckAndRegister(self, job, dep_job_id, dep_status):
1148 """Checks if a dependency job has the requested status.
1149
1150 If the other job is not yet in a finalized status, the calling job will be
1151 notified (re-added to the workerpool) at a later point.
1152
1153 @type job: L{_QueuedJob}
1154 @param job: Job object
1155 @type dep_job_id: int
1156 @param dep_job_id: ID of dependency job
1157 @type dep_status: list
1158 @param dep_status: Required status
1159
1160 """
1161 assert ht.TJobId(job.id)
1162 assert ht.TJobId(dep_job_id)
1163 assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
1164
1165 if job.id == dep_job_id:
1166 return (self.ERROR, "Job can't depend on itself")
1167
1168 # Get status of dependency job
1169 try:
1170 status = self._getstatus_fn(dep_job_id)
1171 except errors.JobLost, err:
1172 return (self.ERROR, "Dependency error: %s" % err)
1173
1174 assert status in constants.JOB_STATUS_ALL
1175
1176 job_id_waiters = self._waiters.setdefault(dep_job_id, set())
1177
1178 if status not in constants.JOBS_FINALIZED:
1179 # Register for notification and wait for job to finish
1180 job_id_waiters.add(job)
1181 return (self.WAIT,
1182 "Need to wait for job %s, wanted status '%s'" %
1183 (dep_job_id, dep_status))
1184
1185 # Remove from waiters list
1186 if job in job_id_waiters:
1187 job_id_waiters.remove(job)
1188
1189 if (status == constants.JOB_STATUS_CANCELED and
1190 constants.JOB_STATUS_CANCELED not in dep_status):
1191 return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
1192
1193 elif not dep_status or status in dep_status:
1194 return (self.CONTINUE,
1195 "Dependency job %s finished with status '%s'" %
1196 (dep_job_id, status))
1197
1198 else:
1199 return (self.WRONGSTATUS,
1200 "Dependency job %s finished with status '%s',"
1201 " not one of '%s' as required" %
1202 (dep_job_id, status, utils.CommaJoin(dep_status)))
1203
1204 def _RemoveEmptyWaitersUnlocked(self):
1205 """Remove all jobs without actual waiters.
1206
1207 """
1208 for job_id in [job_id for (job_id, waiters) in self._waiters.items()
1209 if not waiters]:
1210 del self._waiters[job_id]
1211
1212
1213 class JobQueue(object):
1214 """Queue used to manage the jobs.
1215
1216 """
1217 def __init__(self, context, cfg):
1218 """Constructor for JobQueue.
1219
1220 The constructor will initialize the job queue object and then
1221 start loading the current jobs from disk, either for starting them
1222 (if they were queue) or for aborting them (if they were already
1223 running).
1224
1225 @type context: GanetiContext
1226 @param context: the context object for access to the configuration
1227 data and other ganeti objects
1228
1229 """
1230 self.context = context
1231 self._memcache = weakref.WeakValueDictionary()
1232 self._my_hostname = netutils.Hostname.GetSysName()
1233
1234 # Get initial list of nodes
1235 self._nodes = dict((n.name, n.primary_ip)
1236 for n in cfg.GetAllNodesInfo().values()
1237 if n.master_candidate)
1238
1239 # Remove master node
1240 self._nodes.pop(self._my_hostname, None)
1241
1242 # Job dependencies
1243 self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies)
1244
1245 def _GetRpc(self, address_list):
1246 """Gets RPC runner with context.
1247
1248 """
1249 return rpc.JobQueueRunner(self.context, address_list)
1250
1251 @staticmethod
1252 def _CheckRpcResult(result, nodes, failmsg):
1253 """Verifies the status of an RPC call.
1254
1255 Since we aim to keep consistency should this node (the current
1256 master) fail, we will log errors if our rpc fail, and especially
1257 log the case when more than half of the nodes fails.
1258
1259 @param result: the data as returned from the rpc call
1260 @type nodes: list
1261 @param nodes: the list of nodes we made the call to
1262 @type failmsg: str
1263 @param failmsg: the identifier to be used for logging
1264
1265 """
1266 failed = []
1267 success = []
1268
1269 for node in nodes:
1270 msg = result[node].fail_msg
1271 if msg:
1272 failed.append(node)
1273 logging.error("RPC call %s (%s) failed on node %s: %s",
1274 result[node].call, failmsg, node, msg)
1275 else:
1276 success.append(node)
1277
1278 # +1 for the master node
1279 if (len(success) + 1) < len(failed):
1280 # TODO: Handle failing nodes
1281 logging.error("More than half of the nodes failed")
1282
1283 def _GetNodeIp(self):
1284 """Helper for returning the node name/ip list.
1285
1286 @rtype: (list, list)
1287 @return: a tuple of two lists, the first one with the node
1288 names and the second one with the node addresses
1289
1290 """
1291 # TODO: Change to "tuple(map(list, zip(*self._nodes.items())))"?
1292 name_list = self._nodes.keys()
1293 addr_list = [self._nodes[name] for name in name_list]
1294 return name_list, addr_list
1295
1296 def _UpdateJobQueueFile(self, file_name, data, replicate):
1297 """Writes a file locally and then replicates it to all nodes.
1298
1299 This function will replace the contents of a file on the local
1300 node and then replicate it to all the other nodes we have.
1301
1302 @type file_name: str
1303 @param file_name: the path of the file to be replicated
1304 @type data: str
1305 @param data: the new contents of the file
1306 @type replicate: boolean
1307 @param replicate: whether to spread the changes to the remote nodes
1308
1309 """
1310 getents = runtime.GetEnts()
1311 utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
1312 gid=getents.daemons_gid,
1313 mode=constants.JOB_QUEUE_FILES_PERMS)
1314
1315 if replicate:
1316 names, addrs = self._GetNodeIp()
1317 result = _CallJqUpdate(self._GetRpc(addrs), names, file_name, data)
1318 self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1319
1320 def _RenameFilesUnlocked(self, rename):
1321 """Renames a file locally and then replicate the change.
1322
1323 This function will rename a file in the local queue directory
1324 and then replicate this rename to all the other nodes we have.
1325
1326 @type rename: list of (old, new)
1327 @param rename: List containing tuples mapping old to new names
1328
1329 """
1330 # Rename them locally
1331 for old, new in rename:
1332 utils.RenameFile(old, new, mkdir=True)
1333
1334 # ... and on all nodes
1335 names, addrs = self._GetNodeIp()
1336 result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1337 self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1338
1339 @staticmethod
1340 def _GetJobPath(job_id):
1341 """Returns the job file for a given job id.
1342
1343 @type job_id: str
1344 @param job_id: the job identifier
1345 @rtype: str
1346 @return: the path to the job file
1347
1348 """
1349 return utils.PathJoin(pathutils.QUEUE_DIR, "job-%s" % job_id)
1350
1351 @staticmethod
1352 def _GetArchivedJobPath(job_id):
1353 """Returns the archived job file for a give job id.
1354
1355 @type job_id: str
1356 @param job_id: the job identifier
1357 @rtype: str
1358 @return: the path to the archived job file
1359
1360 """
1361 return utils.PathJoin(pathutils.JOB_QUEUE_ARCHIVE_DIR,
1362 jstore.GetArchiveDirectory(job_id),
1363 "job-%s" % job_id)
1364
1365 @staticmethod
1366 def _DetermineJobDirectories(archived):
1367 """Build list of directories containing job files.
1368
1369 @type archived: bool
1370 @param archived: Whether to include directories for archived jobs
1371 @rtype: list
1372
1373 """
1374 result = [pathutils.QUEUE_DIR]
1375
1376 if archived:
1377 archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR
1378 result.extend(map(compat.partial(utils.PathJoin, archive_path),
1379 utils.ListVisibleFiles(archive_path)))
1380
1381 return result
1382
1383 @classmethod
1384 def _GetJobIDsUnlocked(cls, sort=True, archived=False):
1385 """Return all known job IDs.
1386
1387 The method only looks at disk because it's a requirement that all
1388 jobs are present on disk (so in the _memcache we don't have any
1389 extra IDs).
1390
1391 @type sort: boolean
1392 @param sort: perform sorting on the returned job ids
1393 @rtype: list
1394 @return: the list of job IDs
1395
1396 """
1397 jlist = []
1398
1399 for path in cls._DetermineJobDirectories(archived):
1400 for filename in utils.ListVisibleFiles(path):
1401 m = constants.JOB_FILE_RE.match(filename)
1402 if m:
1403 jlist.append(int(m.group(1)))
1404
1405 if sort:
1406 jlist.sort()
1407 return jlist
1408
1409 def _LoadJobUnlocked(self, job_id):
1410 """Loads a job from the disk or memory.
1411
1412 Given a job id, this will return the cached job object if
1413 existing, or try to load the job from the disk. If loading from
1414 disk, it will also add the job to the cache.
1415
1416 @type job_id: int
1417 @param job_id: the job id
1418 @rtype: L{_QueuedJob} or None
1419 @return: either None or the job object
1420
1421 """
1422 assert isinstance(job_id, int), "Job queue: Supplied job id is not an int!"
1423
1424 job = self._memcache.get(job_id, None)
1425 if job:
1426 logging.debug("Found job %s in memcache", job_id)
1427 assert job.writable, "Found read-only job in memcache"
1428 return job
1429
1430 try:
1431 job = self._LoadJobFromDisk(job_id, False)
1432 if job is None:
1433 return job
1434 except errors.JobFileCorrupted:
1435 old_path = self._GetJobPath(job_id)
1436 new_path = self._GetArchivedJobPath(job_id)
1437 if old_path == new_path:
1438 # job already archived (future case)
1439 logging.exception("Can't parse job %s", job_id)
1440 else:
1441 # non-archived case
1442 logging.exception("Can't parse job %s, will archive.", job_id)
1443 self._RenameFilesUnlocked([(old_path, new_path)])
1444 return None
1445
1446 assert job.writable, "Job just loaded is not writable"
1447
1448 self._memcache[job_id] = job
1449 logging.debug("Added job %s to the cache", job_id)
1450 return job
1451
1452 def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
1453 """Load the given job file from disk.
1454
1455 Given a job file, read, load and restore it in a _QueuedJob format.
1456
1457 @type job_id: int
1458 @param job_id: job identifier
1459 @type try_archived: bool
1460 @param try_archived: Whether to try loading an archived job
1461 @rtype: L{_QueuedJob} or None
1462 @return: either None or the job object
1463
1464 """
1465 path_functions = [(self._GetJobPath, False)]
1466
1467 if try_archived:
1468 path_functions.append((self._GetArchivedJobPath, True))
1469
1470 raw_data = None
1471 archived = None
1472
1473 for (fn, archived) in path_functions:
1474 filepath = fn(job_id)
1475 logging.debug("Loading job from %s", filepath)
1476 try:
1477 raw_data = utils.ReadFile(filepath)
1478 except EnvironmentError, err:
1479 if err.errno != errno.ENOENT:
1480 raise
1481 else:
1482 break
1483
1484 if not raw_data:
1485 logging.debug("No data available for job %s", job_id)
1486 return None
1487
1488 if writable is None:
1489 writable = not archived
1490
1491 try:
1492 data = serializer.LoadJson(raw_data)
1493 job = _QueuedJob.Restore(self, data, writable, archived)
1494 except Exception, err: # pylint: disable=W0703
1495 raise errors.JobFileCorrupted(err)
1496
1497 return job
1498
1499 def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
1500 """Load the given job file from disk.
1501
1502 Given a job file, read, load and restore it in a _QueuedJob format.
1503 In case of error reading the job, it gets returned as None, and the
1504 exception is logged.
1505
1506 @type job_id: int
1507 @param job_id: job identifier
1508 @type try_archived: bool
1509 @param try_archived: Whether to try loading an archived job
1510 @rtype: L{_QueuedJob} or None
1511 @return: either None or the job object
1512
1513 """
1514 try:
1515 return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
1516 except (errors.JobFileCorrupted, EnvironmentError):
1517 logging.exception("Can't load/parse job %s", job_id)
1518 return None
1519
1520 @classmethod
1521 def SubmitManyJobs(cls, jobs):
1522 """Create and store multiple jobs.
1523
1524 """
1525 return luxi.Client(address=pathutils.QUERY_SOCKET).SubmitManyJobs(jobs)
1526
1527 @staticmethod
1528 def _ResolveJobDependencies(resolve_fn, deps):
1529 """Resolves relative job IDs in dependencies.
1530
1531 @type resolve_fn: callable
1532 @param resolve_fn: Function to resolve a relative job ID
1533 @type deps: list
1534 @param deps: Dependencies
1535 @rtype: tuple; (boolean, string or list)
1536 @return: If successful (first tuple item), the returned list contains
1537 resolved job IDs along with the requested status; if not successful,
1538 the second element is an error message
1539
1540 """
1541 result = []
1542
1543 for (dep_job_id, dep_status) in deps:
1544 if ht.TRelativeJobId(dep_job_id):
1545 assert ht.TInt(dep_job_id) and dep_job_id < 0
1546 try:
1547 job_id = resolve_fn(dep_job_id)
1548 except IndexError:
1549 # Abort
1550 return (False, "Unable to resolve relative job ID %s" % dep_job_id)
1551 else:
1552 job_id = dep_job_id
1553
1554 result.append((job_id, dep_status))
1555
1556 return (True, result)
1557
1558 def _GetJobStatusForDependencies(self, job_id):
1559 """Gets the status of a job for dependencies.
1560
1561 @type job_id: int
1562 @param job_id: Job ID
1563 @raise errors.JobLost: If job can't be found
1564
1565 """
1566 # Not using in-memory cache as doing so would require an exclusive lock
1567
1568 # Try to load from disk
1569 job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
1570
1571 if job:
1572 assert not job.writable, "Got writable job" # pylint: disable=E1101
1573
1574 if job:
1575 return job.CalcStatus()
1576
1577 raise errors.JobLost("Job %s not found" % job_id)
1578
1579 def UpdateJobUnlocked(self, job, replicate=True):
1580 """Update a job's on disk storage.
1581
1582 After a job has been modified, this function needs to be called in
1583 order to write the changes to disk and replicate them to the other
1584 nodes.
1585
1586 @type job: L{_QueuedJob}
1587 @param job: the changed job
1588 @type replicate: boolean
1589 @param replicate: whether to replicate the change to remote nodes
1590
1591 """
1592 if __debug__:
1593 finalized = job.CalcStatus() in constants.JOBS_FINALIZED
1594 assert (finalized ^ (job.end_timestamp is None))
1595 assert job.writable, "Can't update read-only job"
1596 assert not job.archived, "Can't update archived job"
1597
1598 filename = self._GetJobPath(job.id)
1599 data = serializer.DumpJson(job.Serialize())
1600 logging.debug("Writing job %s to %s", job.id, filename)
1601 self._UpdateJobQueueFile(filename, data, replicate)
1602
1603 def HasJobBeenFinalized(self, job_id):
1604 """Checks if a job has been finalized.
1605
1606 @type job_id: int
1607 @param job_id: Job identifier
1608 @rtype: boolean
1609 @return: True if the job has been finalized,
1610 False if the timeout has been reached,
1611 None if the job doesn't exist
1612
1613 """
1614 job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
1615 if job is not None:
1616 return job.CalcStatus() in constants.JOBS_FINALIZED
1617 elif cluster.LUClusterDestroy.clusterHasBeenDestroyed:
1618 # FIXME: The above variable is a temporary workaround until the Python job
1619 # queue is completely removed. When removing the job queue, also remove
1620 # the variable from LUClusterDestroy.
1621 return True
1622 else:
1623 return None
1624
1625 def CancelJob(self, job_id):
1626 """Cancels a job.
1627
1628 This will only succeed if the job has not started yet.
1629
1630 @type job_id: int
1631 @param job_id: job ID of job to be cancelled.
1632
1633 """
1634 logging.info("Cancelling job %s", job_id)
1635
1636 return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
1637
1638 def ChangeJobPriority(self, job_id, priority):
1639 """Changes a job's priority.
1640
1641 @type job_id: int
1642 @param job_id: ID of the job whose priority should be changed
1643 @type priority: int
1644 @param priority: New priority
1645
1646 """
1647 logging.info("Changing priority of job %s to %s", job_id, priority)
1648
1649 if priority not in constants.OP_PRIO_SUBMIT_VALID:
1650 allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
1651 raise errors.GenericError("Invalid priority %s, allowed are %s" %
1652 (priority, allowed))
1653
1654 def fn(job):
1655 (success, msg) = job.ChangePriority(priority)
1656 return (success, msg)
1657
1658 return self._ModifyJobUnlocked(job_id, fn)
1659
1660 def _ModifyJobUnlocked(self, job_id, mod_fn):
1661 """Modifies a job.
1662
1663 @type job_id: int
1664 @param job_id: Job ID
1665 @type mod_fn: callable
1666 @param mod_fn: Modifying function, receiving job object as parameter,
1667 returning tuple of (status boolean, message string)
1668
1669 """
1670 job = self._LoadJobUnlocked(job_id)
1671 if not job:
1672 logging.debug("Job %s not found", job_id)
1673 return (False, "Job %s not found" % job_id)
1674
1675 assert job.writable, "Can't modify read-only job"
1676 assert not job.archived, "Can't modify archived job"
1677
1678 (success, msg) = mod_fn(job)
1679
1680 if success:
1681 # If the job was finalized (e.g. cancelled), this is the final write
1682 # allowed. The job can be archived anytime.
1683 self.UpdateJobUnlocked(job)
1684
1685 return (success, msg)