Fix socket permissions after master-failover
[ganeti-github.git] / src / Ganeti / Luxi.hs
1 {-# LANGUAGE TemplateHaskell #-}
2
3 {-| Implementation of the Ganeti LUXI interface.
4
5 -}
6
7 {-
8
9 Copyright (C) 2009, 2010, 2011, 2012 Google Inc.
10
11 This program is free software; you can redistribute it and/or modify
12 it under the terms of the GNU General Public License as published by
13 the Free Software Foundation; either version 2 of the License, or
14 (at your option) any later version.
15
16 This program is distributed in the hope that it will be useful, but
17 WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19 General Public License for more details.
20
21 You should have received a copy of the GNU General Public License
22 along with this program; if not, write to the Free Software
23 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24 02110-1301, USA.
25
26 -}
27
28 module Ganeti.Luxi
29 ( LuxiOp(..)
30 , LuxiReq(..)
31 , Client
32 , JobId
33 , fromJobId
34 , makeJobId
35 , RecvResult(..)
36 , strOfOp
37 , getClient
38 , getServer
39 , acceptClient
40 , closeClient
41 , closeServer
42 , callMethod
43 , submitManyJobs
44 , queryJobsStatus
45 , buildCall
46 , buildResponse
47 , validateCall
48 , decodeCall
49 , recvMsg
50 , recvMsgExt
51 , sendMsg
52 , allLuxiCalls
53 ) where
54
55 import Control.Exception (catch)
56 import Data.IORef
57 import qualified Data.ByteString as B
58 import qualified Data.ByteString.Lazy as BL
59 import qualified Data.ByteString.UTF8 as UTF8
60 import qualified Data.ByteString.Lazy.UTF8 as UTF8L
61 import Data.Word (Word8)
62 import Control.Monad
63 import Text.JSON (encodeStrict, decodeStrict)
64 import qualified Text.JSON as J
65 import Text.JSON.Pretty (pp_value)
66 import Text.JSON.Types
67 import System.Directory (removeFile)
68 import System.IO (hClose, hFlush, hWaitForInput, Handle, IOMode(..))
69 import System.IO.Error (isEOFError)
70 import System.Posix.Files
71 import System.Timeout
72 import qualified Network.Socket as S
73
74 import Ganeti.BasicTypes
75 import Ganeti.Constants
76 import Ganeti.Errors
77 import Ganeti.JSON
78 import Ganeti.OpParams (pTagsObject)
79 import Ganeti.OpCodes
80 import Ganeti.Runtime
81 import qualified Ganeti.Query.Language as Qlang
82 import Ganeti.THH
83 import Ganeti.Types
84 import Ganeti.Utils
85
86 -- * Utility functions
87
88 -- | Wrapper over System.Timeout.timeout that fails in the IO monad.
89 withTimeout :: Int -> String -> IO a -> IO a
90 withTimeout secs descr action = do
91 result <- timeout (secs * 1000000) action
92 case result of
93 Nothing -> fail $ "Timeout in " ++ descr
94 Just v -> return v
95
96 -- * Generic protocol functionality
97
98 -- | Result of receiving a message from the socket.
99 data RecvResult = RecvConnClosed -- ^ Connection closed
100 | RecvError String -- ^ Any other error
101 | RecvOk String -- ^ Successfull receive
102 deriving (Show, Eq)
103
104 -- | Currently supported Luxi operations and JSON serialization.
105 $(genLuxiOp "LuxiOp"
106 [ (luxiReqQuery,
107 [ simpleField "what" [t| Qlang.ItemType |]
108 , simpleField "fields" [t| [String] |]
109 , simpleField "qfilter" [t| Qlang.Filter Qlang.FilterField |]
110 ])
111 , (luxiReqQueryFields,
112 [ simpleField "what" [t| Qlang.ItemType |]
113 , simpleField "fields" [t| [String] |]
114 ])
115 , (luxiReqQueryNodes,
116 [ simpleField "names" [t| [String] |]
117 , simpleField "fields" [t| [String] |]
118 , simpleField "lock" [t| Bool |]
119 ])
120 , (luxiReqQueryGroups,
121 [ simpleField "names" [t| [String] |]
122 , simpleField "fields" [t| [String] |]
123 , simpleField "lock" [t| Bool |]
124 ])
125 , (luxiReqQueryNetworks,
126 [ simpleField "names" [t| [String] |]
127 , simpleField "fields" [t| [String] |]
128 , simpleField "lock" [t| Bool |]
129 ])
130 , (luxiReqQueryInstances,
131 [ simpleField "names" [t| [String] |]
132 , simpleField "fields" [t| [String] |]
133 , simpleField "lock" [t| Bool |]
134 ])
135 , (luxiReqQueryJobs,
136 [ simpleField "ids" [t| [JobId] |]
137 , simpleField "fields" [t| [String] |]
138 ])
139 , (luxiReqQueryExports,
140 [ simpleField "nodes" [t| [String] |]
141 , simpleField "lock" [t| Bool |]
142 ])
143 , (luxiReqQueryConfigValues,
144 [ simpleField "fields" [t| [String] |] ]
145 )
146 , (luxiReqQueryClusterInfo, [])
147 , (luxiReqQueryTags,
148 [ pTagsObject ])
149 , (luxiReqSubmitJob,
150 [ simpleField "job" [t| [MetaOpCode] |] ]
151 )
152 , (luxiReqSubmitManyJobs,
153 [ simpleField "ops" [t| [[MetaOpCode]] |] ]
154 )
155 , (luxiReqWaitForJobChange,
156 [ simpleField "job" [t| JobId |]
157 , simpleField "fields" [t| [String]|]
158 , simpleField "prev_job" [t| JSValue |]
159 , simpleField "prev_log" [t| JSValue |]
160 , simpleField "tmout" [t| Int |]
161 ])
162 , (luxiReqArchiveJob,
163 [ simpleField "job" [t| JobId |] ]
164 )
165 , (luxiReqAutoArchiveJobs,
166 [ simpleField "age" [t| Int |]
167 , simpleField "tmout" [t| Int |]
168 ])
169 , (luxiReqCancelJob,
170 [ simpleField "job" [t| JobId |] ]
171 )
172 , (luxiReqChangeJobPriority,
173 [ simpleField "job" [t| JobId |]
174 , simpleField "priority" [t| Int |] ]
175 )
176 , (luxiReqSetDrainFlag,
177 [ simpleField "flag" [t| Bool |] ]
178 )
179 , (luxiReqSetWatcherPause,
180 [ simpleField "duration" [t| Double |] ]
181 )
182 ])
183
184 $(makeJSONInstance ''LuxiReq)
185
186 -- | List of all defined Luxi calls.
187 $(genAllConstr (drop 3) ''LuxiReq "allLuxiCalls")
188
189 -- | The serialisation of LuxiOps into strings in messages.
190 $(genStrOfOp ''LuxiOp "strOfOp")
191
192 -- | Type holding the initial (unparsed) Luxi call.
193 data LuxiCall = LuxiCall LuxiReq JSValue
194
195 -- | The end-of-message separator.
196 eOM :: Word8
197 eOM = 3
198
199 -- | The end-of-message encoded as a ByteString.
200 bEOM :: B.ByteString
201 bEOM = B.singleton eOM
202
203 -- | Valid keys in the requests and responses.
204 data MsgKeys = Method
205 | Args
206 | Success
207 | Result
208
209 -- | The serialisation of MsgKeys into strings in messages.
210 $(genStrOfKey ''MsgKeys "strOfKey")
211
212 -- | Luxi client encapsulation.
213 data Client = Client { socket :: Handle -- ^ The socket of the client
214 , rbuf :: IORef B.ByteString -- ^ Already received buffer
215 }
216
217 -- | Connects to the master daemon and returns a luxi Client.
218 getClient :: String -> IO Client
219 getClient path = do
220 s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
221 withTimeout luxiDefCtmo "creating luxi connection" $
222 S.connect s (S.SockAddrUnix path)
223 rf <- newIORef B.empty
224 h <- S.socketToHandle s ReadWriteMode
225 return Client { socket=h, rbuf=rf }
226
227 -- | Creates and returns a server endpoint.
228 getServer :: Bool -> FilePath -> IO S.Socket
229 getServer setOwner path = do
230 s <- S.socket S.AF_UNIX S.Stream S.defaultProtocol
231 S.bindSocket s (S.SockAddrUnix path)
232 when setOwner $ do
233 setOwnerAndGroupFromNames path GanetiLuxid $ ExtraGroup DaemonsGroup
234 setFileMode path $ fromIntegral luxiSocketPerms
235 S.listen s 5 -- 5 is the max backlog
236 return s
237
238 -- | Closes a server endpoint.
239 -- FIXME: this should be encapsulated into a nicer type.
240 closeServer :: FilePath -> S.Socket -> IO ()
241 closeServer path sock = do
242 S.sClose sock
243 removeFile path
244
245 -- | Accepts a client
246 acceptClient :: S.Socket -> IO Client
247 acceptClient s = do
248 -- second return is the address of the client, which we ignore here
249 (client_socket, _) <- S.accept s
250 new_buffer <- newIORef B.empty
251 handle <- S.socketToHandle client_socket ReadWriteMode
252 return Client { socket=handle, rbuf=new_buffer }
253
254 -- | Closes the client socket.
255 closeClient :: Client -> IO ()
256 closeClient = hClose . socket
257
258 -- | Sends a message over a luxi transport.
259 sendMsg :: Client -> String -> IO ()
260 sendMsg s buf = withTimeout luxiDefRwto "sending luxi message" $ do
261 let encoded = UTF8L.fromString buf
262 handle = socket s
263 BL.hPut handle encoded
264 B.hPut handle bEOM
265 hFlush handle
266
267 -- | Given a current buffer and the handle, it will read from the
268 -- network until we get a full message, and it will return that
269 -- message and the leftover buffer contents.
270 recvUpdate :: Handle -> B.ByteString -> IO (B.ByteString, B.ByteString)
271 recvUpdate handle obuf = do
272 nbuf <- withTimeout luxiDefRwto "reading luxi response" $ do
273 _ <- hWaitForInput handle (-1)
274 B.hGetNonBlocking handle 4096
275 let (msg, remaining) = B.break (eOM ==) nbuf
276 newbuf = B.append obuf msg
277 if B.null remaining
278 then recvUpdate handle newbuf
279 else return (newbuf, B.tail remaining)
280
281 -- | Waits for a message over a luxi transport.
282 recvMsg :: Client -> IO String
283 recvMsg s = do
284 cbuf <- readIORef $ rbuf s
285 let (imsg, ibuf) = B.break (eOM ==) cbuf
286 (msg, nbuf) <-
287 if B.null ibuf -- if old buffer didn't contain a full message
288 then recvUpdate (socket s) cbuf -- then we read from network
289 else return (imsg, B.tail ibuf) -- else we return data from our buffer
290 writeIORef (rbuf s) nbuf
291 return $ UTF8.toString msg
292
293 -- | Extended wrapper over recvMsg.
294 recvMsgExt :: Client -> IO RecvResult
295 recvMsgExt s =
296 Control.Exception.catch (liftM RecvOk (recvMsg s)) $ \e ->
297 return $ if isEOFError e
298 then RecvConnClosed
299 else RecvError (show e)
300
301 -- | Serialize a request to String.
302 buildCall :: LuxiOp -- ^ The method
303 -> String -- ^ The serialized form
304 buildCall lo =
305 let ja = [ (strOfKey Method, J.showJSON $ strOfOp lo)
306 , (strOfKey Args, opToArgs lo)
307 ]
308 jo = toJSObject ja
309 in encodeStrict jo
310
311 -- | Serialize the response to String.
312 buildResponse :: Bool -- ^ Success
313 -> JSValue -- ^ The arguments
314 -> String -- ^ The serialized form
315 buildResponse success args =
316 let ja = [ (strOfKey Success, JSBool success)
317 , (strOfKey Result, args)]
318 jo = toJSObject ja
319 in encodeStrict jo
320
321 -- | Check that luxi request contains the required keys and parse it.
322 validateCall :: String -> Result LuxiCall
323 validateCall s = do
324 arr <- fromJResult "parsing top-level luxi message" $
325 decodeStrict s::Result (JSObject JSValue)
326 let aobj = fromJSObject arr
327 call <- fromObj aobj (strOfKey Method)::Result LuxiReq
328 args <- fromObj aobj (strOfKey Args)
329 return (LuxiCall call args)
330
331 -- | Converts Luxi call arguments into a 'LuxiOp' data structure.
332 --
333 -- This is currently hand-coded until we make it more uniform so that
334 -- it can be generated using TH.
335 decodeCall :: LuxiCall -> Result LuxiOp
336 decodeCall (LuxiCall call args) =
337 case call of
338 ReqQueryJobs -> do
339 (jids, jargs) <- fromJVal args
340 jids' <- case jids of
341 JSNull -> return []
342 _ -> fromJVal jids
343 return $ QueryJobs jids' jargs
344 ReqQueryInstances -> do
345 (names, fields, locking) <- fromJVal args
346 return $ QueryInstances names fields locking
347 ReqQueryNodes -> do
348 (names, fields, locking) <- fromJVal args
349 return $ QueryNodes names fields locking
350 ReqQueryGroups -> do
351 (names, fields, locking) <- fromJVal args
352 return $ QueryGroups names fields locking
353 ReqQueryClusterInfo ->
354 return QueryClusterInfo
355 ReqQueryNetworks -> do
356 (names, fields, locking) <- fromJVal args
357 return $ QueryNetworks names fields locking
358 ReqQuery -> do
359 (what, fields, qfilter) <- fromJVal args
360 return $ Query what fields qfilter
361 ReqQueryFields -> do
362 (what, fields) <- fromJVal args
363 fields' <- case fields of
364 JSNull -> return []
365 _ -> fromJVal fields
366 return $ QueryFields what fields'
367 ReqSubmitJob -> do
368 [ops1] <- fromJVal args
369 ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
370 return $ SubmitJob ops2
371 ReqSubmitManyJobs -> do
372 [ops1] <- fromJVal args
373 ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) ops1
374 return $ SubmitManyJobs ops2
375 ReqWaitForJobChange -> do
376 (jid, fields, pinfo, pidx, wtmout) <-
377 -- No instance for 5-tuple, code copied from the
378 -- json sources and adapted
379 fromJResult "Parsing WaitForJobChange message" $
380 case args of
381 JSArray [a, b, c, d, e] ->
382 (,,,,) `fmap`
383 J.readJSON a `ap`
384 J.readJSON b `ap`
385 J.readJSON c `ap`
386 J.readJSON d `ap`
387 J.readJSON e
388 _ -> J.Error "Not enough values"
389 return $ WaitForJobChange jid fields pinfo pidx wtmout
390 ReqArchiveJob -> do
391 [jid] <- fromJVal args
392 return $ ArchiveJob jid
393 ReqAutoArchiveJobs -> do
394 (age, tmout) <- fromJVal args
395 return $ AutoArchiveJobs age tmout
396 ReqQueryExports -> do
397 (nodes, lock) <- fromJVal args
398 return $ QueryExports nodes lock
399 ReqQueryConfigValues -> do
400 [fields] <- fromJVal args
401 return $ QueryConfigValues fields
402 ReqQueryTags -> do
403 (kind, name) <- fromJVal args
404 item <- tagObjectFrom kind name
405 return $ QueryTags item
406 ReqCancelJob -> do
407 [jid] <- fromJVal args
408 return $ CancelJob jid
409 ReqChangeJobPriority -> do
410 (jid, priority) <- fromJVal args
411 return $ ChangeJobPriority jid priority
412 ReqSetDrainFlag -> do
413 [flag] <- fromJVal args
414 return $ SetDrainFlag flag
415 ReqSetWatcherPause -> do
416 [duration] <- fromJVal args
417 return $ SetWatcherPause duration
418
419 -- | Check that luxi responses contain the required keys and that the
420 -- call was successful.
421 validateResult :: String -> ErrorResult JSValue
422 validateResult s = do
423 when (UTF8.replacement_char `elem` s) $
424 fail "Failed to decode UTF-8, detected replacement char after decoding"
425 oarr <- fromJResult "Parsing LUXI response" (decodeStrict s)
426 let arr = J.fromJSObject oarr
427 status <- fromObj arr (strOfKey Success)
428 result <- fromObj arr (strOfKey Result)
429 if status
430 then return result
431 else decodeError result
432
433 -- | Try to decode an error from the server response. This function
434 -- will always fail, since it's called only on the error path (when
435 -- status is False).
436 decodeError :: JSValue -> ErrorResult JSValue
437 decodeError val =
438 case fromJVal val of
439 Ok e -> Bad e
440 Bad msg -> Bad $ GenericError msg
441
442 -- | Generic luxi method call.
443 callMethod :: LuxiOp -> Client -> IO (ErrorResult JSValue)
444 callMethod method s = do
445 sendMsg s $ buildCall method
446 result <- recvMsg s
447 let rval = validateResult result
448 return rval
449
450 -- | Parse job submission result.
451 parseSubmitJobResult :: JSValue -> ErrorResult JobId
452 parseSubmitJobResult (JSArray [JSBool True, v]) =
453 case J.readJSON v of
454 J.Error msg -> Bad $ LuxiError msg
455 J.Ok v' -> Ok v'
456 parseSubmitJobResult (JSArray [JSBool False, JSString x]) =
457 Bad . LuxiError $ fromJSString x
458 parseSubmitJobResult v =
459 Bad . LuxiError $ "Unknown result from the master daemon: " ++
460 show (pp_value v)
461
462 -- | Specialized submitManyJobs call.
463 submitManyJobs :: Client -> [[MetaOpCode]] -> IO (ErrorResult [JobId])
464 submitManyJobs s jobs = do
465 rval <- callMethod (SubmitManyJobs jobs) s
466 -- map each result (status, payload) pair into a nice Result ADT
467 return $ case rval of
468 Bad x -> Bad x
469 Ok (JSArray r) -> mapM parseSubmitJobResult r
470 x -> Bad . LuxiError $
471 "Cannot parse response from Ganeti: " ++ show x
472
473 -- | Custom queryJobs call.
474 queryJobsStatus :: Client -> [JobId] -> IO (ErrorResult [JobStatus])
475 queryJobsStatus s jids = do
476 rval <- callMethod (QueryJobs jids ["status"]) s
477 return $ case rval of
478 Bad x -> Bad x
479 Ok y -> case J.readJSON y::(J.Result [[JobStatus]]) of
480 J.Ok vals -> if any null vals
481 then Bad $
482 LuxiError "Missing job status field"
483 else Ok (map head vals)
484 J.Error x -> Bad $ LuxiError x