Do not add a new Inotify watchers on timer
[ganeti-github.git] / lib / jqueue / exec.py
1 #
2 #
3
4 # Copyright (C) 2014 Google Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions are
9 # met:
10 #
11 # 1. Redistributions of source code must retain the above copyright notice,
12 # this list of conditions and the following disclaimer.
13 #
14 # 2. Redistributions in binary form must reproduce the above copyright
15 # notice, this list of conditions and the following disclaimer in the
16 # documentation and/or other materials provided with the distribution.
17 #
18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
19 # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
20 # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
22 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
23 # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
24 # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
25 # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
26 # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
27 # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30
31 """Module implementing executing of a job as a separate process
32
33 The complete protocol of initializing a job is described in the haskell
34 module Ganeti.Query.Exec
35 """
36
37 import contextlib
38 import logging
39 import os
40 import signal
41 import sys
42 import time
43
44 from ganeti import mcpu
45 from ganeti.server import masterd
46 from ganeti.rpc import transport
47 from ganeti import utils
48 from ganeti import pathutils
49 from ganeti.utils import livelock
50
51
52 def _GetMasterInfo():
53 """Retrieves the job id and lock file name from the master process
54
55 This also closes standard input/output
56
57 """
58 logging.debug("Opening transport over stdin/out")
59 with contextlib.closing(transport.FdTransport((0, 1))) as trans:
60 logging.debug("Reading job id from the master process")
61 job_id = int(trans.Call(""))
62 logging.debug("Got job id %d", job_id)
63 logging.debug("Reading the livelock name from the master process")
64 livelock_name = livelock.LiveLockName(trans.Call(""))
65 logging.debug("Got livelock %s", livelock_name)
66 return (job_id, livelock_name)
67
68
69 def main():
70
71 debug = int(os.environ["GNT_DEBUG"])
72
73 logname = pathutils.GetLogFilename("jobs")
74 utils.SetupLogging(logname, "job-startup", debug=debug)
75
76 (job_id, livelock_name) = _GetMasterInfo()
77
78 utils.SetupLogging(logname, "job-%s" % (job_id,), debug=debug)
79
80 exit_code = 1
81 try:
82 logging.debug("Preparing the context and the configuration")
83 context = masterd.GanetiContext(livelock_name)
84
85 logging.debug("Registering signal handlers")
86
87 cancel = [False]
88 prio_change = [False]
89
90 def _TermHandler(signum, _frame):
91 logging.info("Killed by signal %d", signum)
92 cancel[0] = True
93 signal.signal(signal.SIGTERM, _TermHandler)
94
95 def _HupHandler(signum, _frame):
96 logging.debug("Received signal %d, old flag was %s, will set to True",
97 signum, mcpu.sighupReceived)
98 mcpu.sighupReceived[0] = True
99 signal.signal(signal.SIGHUP, _HupHandler)
100
101 def _User1Handler(signum, _frame):
102 logging.info("Received signal %d, indicating priority change", signum)
103 prio_change[0] = True
104 signal.signal(signal.SIGUSR1, _User1Handler)
105
106 logging.debug("Picking up job %d", job_id)
107 context.jobqueue.PickupJob(job_id)
108
109 # waiting for the job to finish
110 time.sleep(1)
111 while not context.jobqueue.HasJobBeenFinalized(job_id):
112 if cancel[0]:
113 logging.debug("Got cancel request, cancelling job %d", job_id)
114 r = context.jobqueue.CancelJob(job_id)
115 logging.debug("CancelJob result for job %d: %s", job_id, r)
116 cancel[0] = False
117 if prio_change[0]:
118 logging.debug("Received priority-change request")
119 try:
120 fname = os.path.join(pathutils.LUXID_MESSAGE_DIR, "%d.prio" % job_id)
121 new_prio = int(utils.ReadFile(fname))
122 utils.RemoveFile(fname)
123 logging.debug("Changing priority of job %d to %d", job_id, new_prio)
124 r = context.jobqueue.ChangeJobPriority(job_id, new_prio)
125 logging.debug("Result of changing priority of %d to %d: %s", job_id,
126 new_prio, r)
127 except Exception: # pylint: disable=W0703
128 logging.warning("Informed of priority change, but could not"
129 " read new priority")
130 prio_change[0] = False
131 time.sleep(1)
132
133 # wait until the queue finishes
134 logging.debug("Waiting for the queue to finish")
135 while context.jobqueue.PrepareShutdown():
136 time.sleep(1)
137 logging.debug("Shutting the queue down")
138 context.jobqueue.Shutdown()
139 exit_code = 0
140 except Exception: # pylint: disable=W0703
141 logging.exception("Exception when trying to run job %d", job_id)
142 finally:
143 logging.debug("Job %d finalized", job_id)
144 logging.debug("Removing livelock file %s", livelock_name.GetPath())
145 os.remove(livelock_name.GetPath())
146
147 sys.exit(exit_code)
148
149 if __name__ == '__main__':
150 main()