Merge branch 'stable-2.16' into stable-2.17
[ganeti-github.git] / lib / jstore.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions are
9 # met:
10 #
11 # 1. Redistributions of source code must retain the above copyright notice,
12 # this list of conditions and the following disclaimer.
13 #
14 # 2. Redistributions in binary form must reproduce the above copyright
15 # notice, this list of conditions and the following disclaimer in the
16 # documentation and/or other materials provided with the distribution.
17 #
18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
19 # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
20 # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
22 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
23 # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
24 # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
25 # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
26 # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
27 # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30
31 """Module implementing the job queue handling."""
32
33 import errno
34 import os
35
36 from ganeti import constants
37 from ganeti import errors
38 from ganeti import runtime
39 from ganeti import utils
40 from ganeti import pathutils
41
42
43 JOBS_PER_ARCHIVE_DIRECTORY = constants.JSTORE_JOBS_PER_ARCHIVE_DIRECTORY
44
45
46 def _ReadNumericFile(file_name):
47 """Reads a file containing a number.
48
49 @rtype: None or int
50 @return: None if file is not found, otherwise number
51
52 """
53 try:
54 contents = utils.ReadFile(file_name)
55 except EnvironmentError, err:
56 if err.errno in (errno.ENOENT, ):
57 return None
58 raise
59
60 try:
61 return int(contents)
62 except (ValueError, TypeError), err:
63 # Couldn't convert to int
64 raise errors.JobQueueError("Content of file '%s' is not numeric: %s" %
65 (file_name, err))
66
67
68 def ReadSerial():
69 """Read the serial file.
70
71 The queue should be locked while this function is called.
72
73 """
74 return _ReadNumericFile(pathutils.JOB_QUEUE_SERIAL_FILE)
75
76
77 def ReadVersion():
78 """Read the queue version.
79
80 The queue should be locked while this function is called.
81
82 """
83 return _ReadNumericFile(pathutils.JOB_QUEUE_VERSION_FILE)
84
85
86 def InitAndVerifyQueue(must_lock):
87 """Open and lock job queue.
88
89 If necessary, the queue is automatically initialized.
90
91 @type must_lock: bool
92 @param must_lock: Whether an exclusive lock must be held.
93 @rtype: utils.FileLock
94 @return: Lock object for the queue. This can be used to change the
95 locking mode.
96
97 """
98 getents = runtime.GetEnts()
99
100 # Lock queue
101 queue_lock = utils.FileLock.Open(pathutils.JOB_QUEUE_LOCK_FILE)
102 try:
103 # The queue needs to be locked in exclusive mode to write to the serial and
104 # version files.
105 if must_lock:
106 queue_lock.Exclusive(blocking=True)
107 holding_lock = True
108 else:
109 try:
110 queue_lock.Exclusive(blocking=False)
111 holding_lock = True
112 except errors.LockError:
113 # Ignore errors and assume the process keeping the lock checked
114 # everything.
115 holding_lock = False
116
117 if holding_lock:
118 # Verify version
119 version = ReadVersion()
120 if version is None:
121 # Write new version file
122 utils.WriteFile(pathutils.JOB_QUEUE_VERSION_FILE,
123 uid=getents.masterd_uid, gid=getents.daemons_gid,
124 mode=constants.JOB_QUEUE_FILES_PERMS,
125 data="%s\n" % constants.JOB_QUEUE_VERSION)
126
127 # Read again
128 version = ReadVersion()
129
130 if version != constants.JOB_QUEUE_VERSION:
131 raise errors.JobQueueError("Found job queue version %s, expected %s",
132 version, constants.JOB_QUEUE_VERSION)
133
134 serial = ReadSerial()
135 if serial is None:
136 # Write new serial file
137 utils.WriteFile(pathutils.JOB_QUEUE_SERIAL_FILE,
138 uid=getents.masterd_uid, gid=getents.daemons_gid,
139 mode=constants.JOB_QUEUE_FILES_PERMS,
140 data="%s\n" % 0)
141
142 # Read again
143 serial = ReadSerial()
144
145 if serial is None:
146 # There must be a serious problem
147 raise errors.JobQueueError("Can't read/parse the job queue"
148 " serial file")
149
150 if not must_lock:
151 # There's no need for more error handling. Closing the lock
152 # file below in case of an error will unlock it anyway.
153 queue_lock.Unlock()
154
155 except:
156 queue_lock.Close()
157 raise
158
159 return queue_lock
160
161
162 def CheckDrainFlag():
163 """Check if the queue is marked to be drained.
164
165 This currently uses the queue drain file, which makes it a per-node flag.
166 In the future this can be moved to the config file.
167
168 @rtype: boolean
169 @return: True if the job queue is marked drained
170
171 """
172 return os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
173
174
175 def SetDrainFlag(drain_flag):
176 """Sets the drain flag for the queue.
177
178 @type drain_flag: boolean
179 @param drain_flag: Whether to set or unset the drain flag
180 @attention: This function should only called the current holder of the queue
181 lock
182
183 """
184 getents = runtime.GetEnts()
185
186 if drain_flag:
187 utils.WriteFile(pathutils.JOB_QUEUE_DRAIN_FILE, data="",
188 uid=getents.masterd_uid, gid=getents.daemons_gid,
189 mode=constants.JOB_QUEUE_FILES_PERMS)
190 else:
191 utils.RemoveFile(pathutils.JOB_QUEUE_DRAIN_FILE)
192
193 assert (not drain_flag) ^ CheckDrainFlag()
194
195
196 def FormatJobID(job_id):
197 """Convert a job ID to int format.
198
199 Currently this just is a no-op that performs some checks, but if we
200 want to change the job id format this will abstract this change.
201
202 @type job_id: int or long
203 @param job_id: the numeric job id
204 @rtype: int
205 @return: the formatted job id
206
207 """
208 if not isinstance(job_id, (int, long)):
209 raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
210 if job_id < 0:
211 raise errors.ProgrammerError("Job ID %s is negative" % job_id)
212
213 return job_id
214
215
216 def GetArchiveDirectory(job_id):
217 """Returns the archive directory for a job.
218
219 @type job_id: str
220 @param job_id: Job identifier
221 @rtype: str
222 @return: Directory name
223
224 """
225 return str(ParseJobId(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
226
227
228 def ParseJobId(job_id):
229 """Parses a job ID and converts it to integer.
230
231 """
232 try:
233 return int(job_id)
234 except (ValueError, TypeError):
235 raise errors.ParameterError("Invalid job ID '%s'" % job_id)