Merge branch 'stable-2.15' into stable-2.16
[ganeti-github.git] / lib / jqueue / exec.py
1 #
2 #
3
4 # Copyright (C) 2014 Google Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions are
9 # met:
10 #
11 # 1. Redistributions of source code must retain the above copyright notice,
12 # this list of conditions and the following disclaimer.
13 #
14 # 2. Redistributions in binary form must reproduce the above copyright
15 # notice, this list of conditions and the following disclaimer in the
16 # documentation and/or other materials provided with the distribution.
17 #
18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
19 # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
20 # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
22 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
23 # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
24 # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
25 # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
26 # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
27 # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30
31 """Module implementing executing of a job as a separate process
32
33 The complete protocol of initializing a job is described in the haskell
34 module Ganeti.Query.Exec
35 """
36
37 import contextlib
38 import logging
39 import os
40 import signal
41 import sys
42 import time
43
44 from ganeti import mcpu
45 from ganeti.server import masterd
46 from ganeti.rpc import transport
47 from ganeti import serializer
48 from ganeti import utils
49 from ganeti import pathutils
50 from ganeti.utils import livelock
51
52 from ganeti.jqueue import _JobProcessor
53
54
55 def _GetMasterInfo():
56 """Retrieve job id, lock file name and secret params from the master process
57
58 This also closes standard input/output
59
60 @rtype: (int, string, json encoding of a list of dicts)
61
62 """
63 logging.debug("Opening transport over stdin/out")
64 with contextlib.closing(transport.FdTransport((0, 1))) as trans:
65 logging.debug("Reading job id from the master process")
66 job_id = int(trans.Call(""))
67 logging.debug("Got job id %d", job_id)
68 logging.debug("Reading the livelock name from the master process")
69 livelock_name = livelock.LiveLockName(trans.Call(""))
70 logging.debug("Got livelock %s", livelock_name)
71 logging.debug("Reading secret parameters from the master process")
72 secret_params = trans.Call("")
73 logging.debug("Got secret parameters.")
74 return (job_id, livelock_name, secret_params)
75
76
77 def RestorePrivateValueWrapping(json):
78 """Wrap private values in JSON decoded structure.
79
80 @param json: the json-decoded value to protect.
81
82 """
83 result = []
84
85 for secrets_dict in json:
86 if secrets_dict is None:
87 data = serializer.PrivateDict()
88 else:
89 data = serializer.PrivateDict(secrets_dict)
90 result.append(data)
91 return result
92
93
94 def main():
95
96 debug = int(os.environ["GNT_DEBUG"])
97
98 logname = pathutils.GetLogFilename("jobs")
99 utils.SetupLogging(logname, "job-startup", debug=debug)
100
101 (job_id, livelock_name, secret_params_serialized) = _GetMasterInfo()
102
103 secret_params = ""
104 if secret_params_serialized:
105 secret_params_json = serializer.LoadJson(secret_params_serialized)
106 secret_params = RestorePrivateValueWrapping(secret_params_json)
107
108 utils.SetupLogging(logname, "job-%s" % (job_id,), debug=debug)
109
110 try:
111 logging.debug("Preparing the context and the configuration")
112 context = masterd.GanetiContext(livelock_name)
113
114 logging.debug("Registering signal handlers")
115
116 cancel = [False]
117 prio_change = [False]
118
119 def _TermHandler(signum, _frame):
120 logging.info("Killed by signal %d", signum)
121 cancel[0] = True
122 signal.signal(signal.SIGTERM, _TermHandler)
123
124 def _HupHandler(signum, _frame):
125 logging.debug("Received signal %d, old flag was %s, will set to True",
126 signum, mcpu.sighupReceived)
127 mcpu.sighupReceived[0] = True
128 signal.signal(signal.SIGHUP, _HupHandler)
129
130 def _User1Handler(signum, _frame):
131 logging.info("Received signal %d, indicating priority change", signum)
132 prio_change[0] = True
133 signal.signal(signal.SIGUSR1, _User1Handler)
134
135 job = context.jobqueue.SafeLoadJobFromDisk(job_id, False)
136
137 job.SetPid(os.getpid())
138
139 if secret_params:
140 for i in range(0, len(secret_params)):
141 if hasattr(job.ops[i].input, "osparams_secret"):
142 job.ops[i].input.osparams_secret = secret_params[i]
143
144 execfun = mcpu.Processor(context, job_id, job_id).ExecOpCode
145 proc = _JobProcessor(context.jobqueue, execfun, job)
146 result = _JobProcessor.DEFER
147 while result != _JobProcessor.FINISHED:
148 result = proc()
149 if result == _JobProcessor.WAITDEP and not cancel[0]:
150 # Normally, the scheduler should avoid starting a job where the
151 # dependencies are not yet finalised. So warn, but wait an continue.
152 logging.warning("Got started despite a dependency not yet finished")
153 time.sleep(5)
154 if cancel[0]:
155 logging.debug("Got cancel request, cancelling job %d", job_id)
156 r = context.jobqueue.CancelJob(job_id)
157 job = context.jobqueue.SafeLoadJobFromDisk(job_id, False)
158 proc = _JobProcessor(context.jobqueue, execfun, job)
159 logging.debug("CancelJob result for job %d: %s", job_id, r)
160 cancel[0] = False
161 if prio_change[0]:
162 logging.debug("Received priority-change request")
163 try:
164 fname = os.path.join(pathutils.LUXID_MESSAGE_DIR, "%d.prio" % job_id)
165 new_prio = int(utils.ReadFile(fname))
166 utils.RemoveFile(fname)
167 logging.debug("Changing priority of job %d to %d", job_id, new_prio)
168 r = context.jobqueue.ChangeJobPriority(job_id, new_prio)
169 job = context.jobqueue.SafeLoadJobFromDisk(job_id, False)
170 proc = _JobProcessor(context.jobqueue, execfun, job)
171 logging.debug("Result of changing priority of %d to %d: %s", job_id,
172 new_prio, r)
173 except Exception: # pylint: disable=W0703
174 logging.warning("Informed of priority change, but could not"
175 " read new priority")
176 prio_change[0] = False
177
178 except Exception: # pylint: disable=W0703
179 logging.exception("Exception when trying to run job %d", job_id)
180 finally:
181 logging.debug("Job %d finalized", job_id)
182 logging.debug("Removing livelock file %s", livelock_name.GetPath())
183 os.remove(livelock_name.GetPath())
184
185 sys.exit(0)
186
187 if __name__ == '__main__':
188 main()