Merge branch 'stable-2.16' into stable-2.17
[ganeti-github.git] / lib / luxi.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2011, 2012, 2013, 2014 Google Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions are
9 # met:
10 #
11 # 1. Redistributions of source code must retain the above copyright notice,
12 # this list of conditions and the following disclaimer.
13 #
14 # 2. Redistributions in binary form must reproduce the above copyright
15 # notice, this list of conditions and the following disclaimer in the
16 # documentation and/or other materials provided with the distribution.
17 #
18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
19 # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
20 # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
22 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
23 # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
24 # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
25 # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
26 # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
27 # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30
31 """Module for the LUXI protocol
32
33 This module implements the local unix socket protocol. You only need
34 this module and the opcodes module in the client program in order to
35 communicate with the master.
36
37 The module is also used by the master daemon.
38
39 """
40
41 from ganeti import constants
42 from ganeti import pathutils
43 from ganeti import objects
44 import ganeti.rpc.client as cl
45 from ganeti.rpc.errors import RequestError
46 from ganeti.rpc.transport import Transport
47
48 __all__ = [
49 # classes:
50 "Client"
51 ]
52
53 REQ_SUBMIT_JOB = constants.LUXI_REQ_SUBMIT_JOB
54 REQ_SUBMIT_JOB_TO_DRAINED_QUEUE = constants.LUXI_REQ_SUBMIT_JOB_TO_DRAINED_QUEUE
55 REQ_SUBMIT_MANY_JOBS = constants.LUXI_REQ_SUBMIT_MANY_JOBS
56 REQ_PICKUP_JOB = constants.LUXI_REQ_PICKUP_JOB
57 REQ_WAIT_FOR_JOB_CHANGE = constants.LUXI_REQ_WAIT_FOR_JOB_CHANGE
58 REQ_CANCEL_JOB = constants.LUXI_REQ_CANCEL_JOB
59 REQ_ARCHIVE_JOB = constants.LUXI_REQ_ARCHIVE_JOB
60 REQ_CHANGE_JOB_PRIORITY = constants.LUXI_REQ_CHANGE_JOB_PRIORITY
61 REQ_AUTO_ARCHIVE_JOBS = constants.LUXI_REQ_AUTO_ARCHIVE_JOBS
62 REQ_QUERY = constants.LUXI_REQ_QUERY
63 REQ_QUERY_FIELDS = constants.LUXI_REQ_QUERY_FIELDS
64 REQ_QUERY_JOBS = constants.LUXI_REQ_QUERY_JOBS
65 REQ_QUERY_FILTERS = constants.LUXI_REQ_QUERY_FILTERS
66 REQ_REPLACE_FILTER = constants.LUXI_REQ_REPLACE_FILTER
67 REQ_DELETE_FILTER = constants.LUXI_REQ_DELETE_FILTER
68 REQ_QUERY_INSTANCES = constants.LUXI_REQ_QUERY_INSTANCES
69 REQ_QUERY_NODES = constants.LUXI_REQ_QUERY_NODES
70 REQ_QUERY_GROUPS = constants.LUXI_REQ_QUERY_GROUPS
71 REQ_QUERY_NETWORKS = constants.LUXI_REQ_QUERY_NETWORKS
72 REQ_QUERY_EXPORTS = constants.LUXI_REQ_QUERY_EXPORTS
73 REQ_QUERY_CONFIG_VALUES = constants.LUXI_REQ_QUERY_CONFIG_VALUES
74 REQ_QUERY_CLUSTER_INFO = constants.LUXI_REQ_QUERY_CLUSTER_INFO
75 REQ_QUERY_TAGS = constants.LUXI_REQ_QUERY_TAGS
76 REQ_SET_DRAIN_FLAG = constants.LUXI_REQ_SET_DRAIN_FLAG
77 REQ_SET_WATCHER_PAUSE = constants.LUXI_REQ_SET_WATCHER_PAUSE
78 REQ_ALL = constants.LUXI_REQ_ALL
79
80 DEF_RWTO = constants.LUXI_DEF_RWTO
81 WFJC_TIMEOUT = constants.LUXI_WFJC_TIMEOUT
82
83
84 class Client(cl.AbstractClient):
85 """High-level client implementation.
86
87 This uses a backing Transport-like class on top of which it
88 implements data serialization/deserialization.
89
90 """
91 def __init__(self, address=None, timeouts=None, transport=Transport):
92 """Constructor for the Client class.
93
94 Arguments are the same as for L{AbstractClient}.
95
96 """
97 super(Client, self).__init__(timeouts, transport)
98 # Override the version of the protocol:
99 self.version = constants.LUXI_VERSION
100 # Store the socket address
101 if address is None:
102 address = pathutils.QUERY_SOCKET
103 self.address = address
104 self._InitTransport()
105
106 def _GetAddress(self):
107 return self.address
108
109 def SetQueueDrainFlag(self, drain_flag):
110 return self.CallMethod(REQ_SET_DRAIN_FLAG, (drain_flag, ))
111
112 def SetWatcherPause(self, until):
113 return self.CallMethod(REQ_SET_WATCHER_PAUSE, (until, ))
114
115 def PickupJob(self, job):
116 return self.CallMethod(REQ_PICKUP_JOB, (job,))
117
118 def SubmitJob(self, ops):
119 ops_state = [op.__getstate__()
120 if not isinstance(op, objects.ConfigObject)
121 else op.ToDict(_with_private=True)
122 for op in ops]
123 return self.CallMethod(REQ_SUBMIT_JOB, (ops_state, ))
124
125 def SubmitJobToDrainedQueue(self, ops):
126 ops_state = [op.__getstate__() for op in ops]
127 return self.CallMethod(REQ_SUBMIT_JOB_TO_DRAINED_QUEUE, (ops_state, ))
128
129 def SubmitManyJobs(self, jobs):
130 jobs_state = []
131 for ops in jobs:
132 jobs_state.append([op.__getstate__() for op in ops])
133 return self.CallMethod(REQ_SUBMIT_MANY_JOBS, (jobs_state, ))
134
135 @staticmethod
136 def _PrepareJobId(request_name, job_id):
137 try:
138 return int(job_id)
139 except ValueError:
140 raise RequestError("Invalid parameter passed to %s as job id: "
141 " expected integer, got value %s" %
142 (request_name, job_id))
143
144 def CancelJob(self, job_id, kill=False):
145 job_id = Client._PrepareJobId(REQ_CANCEL_JOB, job_id)
146 return self.CallMethod(REQ_CANCEL_JOB, (job_id, kill))
147
148 def ArchiveJob(self, job_id):
149 job_id = Client._PrepareJobId(REQ_ARCHIVE_JOB, job_id)
150 return self.CallMethod(REQ_ARCHIVE_JOB, (job_id, ))
151
152 def ChangeJobPriority(self, job_id, priority):
153 job_id = Client._PrepareJobId(REQ_CHANGE_JOB_PRIORITY, job_id)
154 return self.CallMethod(REQ_CHANGE_JOB_PRIORITY, (job_id, priority))
155
156 def AutoArchiveJobs(self, age):
157 timeout = (DEF_RWTO - 1) / 2
158 return self.CallMethod(REQ_AUTO_ARCHIVE_JOBS, (age, timeout))
159
160 def WaitForJobChangeOnce(self, job_id, fields,
161 prev_job_info, prev_log_serial,
162 timeout=WFJC_TIMEOUT):
163 """Waits for changes on a job.
164
165 @param job_id: Job ID
166 @type fields: list
167 @param fields: List of field names to be observed
168 @type prev_job_info: None or list
169 @param prev_job_info: Previously received job information
170 @type prev_log_serial: None or int/long
171 @param prev_log_serial: Highest log serial number previously received
172 @type timeout: int/float
173 @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
174 be capped to that value)
175
176 """
177 assert timeout >= 0, "Timeout can not be negative"
178 return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
179 (job_id, fields, prev_job_info,
180 prev_log_serial,
181 min(WFJC_TIMEOUT, timeout)))
182
183 def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
184 job_id = Client._PrepareJobId(REQ_WAIT_FOR_JOB_CHANGE, job_id)
185 while True:
186 result = self.WaitForJobChangeOnce(job_id, fields,
187 prev_job_info, prev_log_serial)
188 if result != constants.JOB_NOTCHANGED:
189 break
190 return result
191
192 def Query(self, what, fields, qfilter):
193 """Query for resources/items.
194
195 @param what: One of L{constants.QR_VIA_LUXI}
196 @type fields: List of strings
197 @param fields: List of requested fields
198 @type qfilter: None or list
199 @param qfilter: Query filter
200 @rtype: L{objects.QueryResponse}
201
202 """
203 result = self.CallMethod(REQ_QUERY, (what, fields, qfilter))
204 return objects.QueryResponse.FromDict(result)
205
206 def QueryFields(self, what, fields):
207 """Query for available fields.
208
209 @param what: One of L{constants.QR_VIA_LUXI}
210 @type fields: None or list of strings
211 @param fields: List of requested fields
212 @rtype: L{objects.QueryFieldsResponse}
213
214 """
215 result = self.CallMethod(REQ_QUERY_FIELDS, (what, fields))
216 return objects.QueryFieldsResponse.FromDict(result)
217
218 def QueryJobs(self, job_ids, fields):
219 return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
220
221 def QueryFilters(self, uuids, fields):
222 return self.CallMethod(REQ_QUERY_FILTERS, (uuids, fields))
223
224 def ReplaceFilter(self, uuid, priority, predicates, action, reason):
225 return self.CallMethod(REQ_REPLACE_FILTER,
226 (uuid, priority, predicates, action, reason))
227
228 def DeleteFilter(self, uuid):
229 return self.CallMethod(REQ_DELETE_FILTER, (uuid, ))
230
231 def QueryInstances(self, names, fields, use_locking):
232 return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
233
234 def QueryNodes(self, names, fields, use_locking):
235 return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
236
237 def QueryGroups(self, names, fields, use_locking):
238 return self.CallMethod(REQ_QUERY_GROUPS, (names, fields, use_locking))
239
240 def QueryNetworks(self, names, fields, use_locking):
241 return self.CallMethod(REQ_QUERY_NETWORKS, (names, fields, use_locking))
242
243 def QueryExports(self, nodes, use_locking):
244 return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
245
246 def QueryClusterInfo(self):
247 return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
248
249 def QueryConfigValues(self, fields):
250 return self.CallMethod(REQ_QUERY_CONFIG_VALUES, (fields, ))
251
252 def QueryTags(self, kind, name):
253 return self.CallMethod(REQ_QUERY_TAGS, (kind, name))