Encode UUIDs as ByteStrings
[ganeti-github.git] / src / Ganeti / Query / Server.hs
1 {-# LANGUAGE FlexibleContexts #-}
2
3 {-| Implementation of the Ganeti Query2 server.
4
5 -}
6
7 {-
8
9 Copyright (C) 2012, 2013, 2014 Google Inc.
10 All rights reserved.
11
12 Redistribution and use in source and binary forms, with or without
13 modification, are permitted provided that the following conditions are
14 met:
15
16 1. Redistributions of source code must retain the above copyright notice,
17 this list of conditions and the following disclaimer.
18
19 2. Redistributions in binary form must reproduce the above copyright
20 notice, this list of conditions and the following disclaimer in the
21 documentation and/or other materials provided with the distribution.
22
23 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
24 IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
25 TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
26 PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
27 CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
28 EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
29 PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
30 PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
31 LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
32 NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
33 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
34
35 -}
36
37 module Ganeti.Query.Server
38 ( main
39 , checkMain
40 , prepMain
41 ) where
42
43 import Control.Applicative
44 import Control.Concurrent
45 import Control.Exception
46 import Control.Lens ((.~))
47 import Control.Monad (forever, when, mzero, guard, zipWithM, liftM, void)
48 import Control.Monad.Base (MonadBase, liftBase)
49 import Control.Monad.Error (MonadError)
50 import Control.Monad.IO.Class
51 import Control.Monad.Trans (lift)
52 import Control.Monad.Trans.Maybe
53 import qualified Data.ByteString.UTF8 as UTF8
54 import qualified Data.Set as Set (toList)
55 import Data.IORef
56 import Data.List (intersperse)
57 import Data.Maybe (fromMaybe)
58 import qualified Text.JSON as J
59 import Text.JSON (encode, showJSON, JSValue(..))
60 import System.Info (arch)
61 import System.Directory
62 import System.Posix.Process (getProcessID)
63 import System.Posix.Signals as P
64
65 import qualified Ganeti.Constants as C
66 import qualified Ganeti.ConstantUtils as ConstantUtils (unFrozenSet)
67 import Ganeti.Errors
68 import qualified Ganeti.Path as Path
69 import Ganeti.Daemon
70 import Ganeti.Daemon.Utils (handleMasterVerificationOptions)
71 import Ganeti.Objects
72 import Ganeti.Objects.Lens (configFiltersL)
73 import qualified Ganeti.Config as Config
74 import qualified Ganeti.Compat as Compat
75 import Ganeti.ConfigReader
76 import Ganeti.BasicTypes
77 import Ganeti.JQueue
78 import Ganeti.JQScheduler
79 import Ganeti.JSON (TimeAsDoubleJSON(..), alterContainerL, lookupContainer)
80 import Ganeti.Locking.Locks (ClientId(..), ClientType(ClientOther))
81 import Ganeti.Logging
82 import Ganeti.Luxi
83 import qualified Ganeti.Query.Language as Qlang
84 import qualified Ganeti.Query.Cluster as QCluster
85 import Ganeti.Path ( queueDir, jobQueueLockFile, jobQueueDrainFile )
86 import Ganeti.Rpc
87 import qualified Ganeti.Query.Exec as Exec
88 import Ganeti.Query.Query
89 import Ganeti.Query.Filter (makeSimpleFilter)
90 import Ganeti.THH.HsRPC (runRpcClient, RpcClientMonad)
91 import Ganeti.Types
92 import qualified Ganeti.UDSServer as U (Handler(..), listener)
93 import Ganeti.Utils ( lockFile, exitIfBad, exitUnless, watchFile
94 , safeRenameFile, newUUID, isUUID )
95 import Ganeti.Utils.Monad (orM)
96 import Ganeti.Utils.MVarLock
97 import qualified Ganeti.Version as Version
98 import Ganeti.WConfd.Client ( getWConfdClient, withLockedConfig, writeConfig
99 , cleanupLocks)
100
101
102 -- | Creates a `ClientId` that identifies the current luxi
103 -- (process, thread).
104 --
105 -- This means that this `ClientId` will be different for each request
106 -- handled by luxid.
107 makeLuxidClientId :: JQStatus -> IO ClientId
108 makeLuxidClientId status = do
109 pid <- getProcessID
110 tid <- myThreadId
111 return ClientId
112 { ciIdentifier = ClientOther $ "luxid-" ++ show tid
113 , ciLockFile = jqLivelock status
114 , ciPid = pid
115 }
116
117 -- | Creates a connection to WConfd and locks the config, allowing
118 -- to run some WConfd RPC commands given the locked config.
119 --
120 -- This is needed when luxid wants to change the config.
121 --
122 -- Example:
123 --
124 -- > cid <- makeLuxidClientId ...
125 -- > withLockedWconfdConfig cid $ \lockedCfg -> do
126 -- > -- some (IO) action that needs to be run inside having the lock
127 -- > writeConfig cid (updateConfig lockedCfg)
128 withLockedWconfdConfig
129 :: (MonadBase IO m, MonadError GanetiException m)
130 => ClientId
131 -> (ConfigData -> RpcClientMonad a)
132 -> m a
133 withLockedWconfdConfig cid f = do
134 wconfdClient <- liftBase $ getWConfdClient =<< Path.defaultWConfdSocket
135 runRpcClient (withLockedConfig cid False f) wconfdClient
136
137 -- | Helper for classic queries.
138 handleQuery :: [Qlang.ItemType -> Qlang.FilterField] -- ^ Fields to put into
139 -- the query
140 -> ConfigData -- ^ Cluster config
141 -> Qlang.ItemType -- ^ Query type
142 -> [Either String Integer] -- ^ Requested names
143 -- (empty means all)
144 -> [String] -- ^ Requested fields
145 -> Bool -- ^ Whether to do sync queries or not
146 -> IO (GenericResult GanetiException JSValue)
147 handleQuery _ _ _ _ _ True =
148 return . Bad $ OpPrereqError "Sync queries are not allowed" ECodeInval
149 handleQuery filterFields cfg qkind names fields _ = do
150 let simpleNameFilter field = makeSimpleFilter (field qkind) names
151 flt = Qlang.OrFilter $ map simpleNameFilter filterFields
152 qr <- query cfg True (Qlang.Query qkind fields flt)
153 return $ showJSON <$> (qr >>= queryCompat)
154
155 -- | Helper for classic queries.
156 -- Queries `name` and `uuid` fields.
157 handleClassicQuery :: ConfigData -- ^ Cluster config
158 -> Qlang.ItemType -- ^ Query type
159 -> [Either String Integer] -- ^ Requested names
160 -- (empty means all)
161 -> [String] -- ^ Requested fields
162 -> Bool -- ^ Whether to do sync queries or not
163 -> IO (GenericResult GanetiException JSValue)
164 handleClassicQuery = handleQuery [nameField, uuidField]
165
166 -- | Like `handleClassicQuery`, but filters only by UUID.
167 handleUuidQuery :: ConfigData -- ^ Cluster config
168 -> Qlang.ItemType -- ^ Query type
169 -> [Either String Integer] -- ^ Requested names
170 -- (empty means all)
171 -> [String] -- ^ Requested fields
172 -> Bool -- ^ Whether to do sync queries or not
173 -> IO (GenericResult GanetiException JSValue)
174 handleUuidQuery = handleQuery [uuidField]
175
176 -- | Minimal wrapper to handle the missing config case.
177 handleCallWrapper :: Lock -> JQStatus -> Result ConfigData
178 -> LuxiOp -> IO (ErrorResult JSValue)
179 handleCallWrapper _ _ (Bad msg) _ =
180 return . Bad . ConfigurationError $
181 "I do not have access to a valid configuration, cannot\
182 \ process queries: " ++ msg
183 handleCallWrapper qlock qstat (Ok config) op = handleCall qlock qstat config op
184
185 -- | Actual luxi operation handler.
186 handleCall :: Lock -> JQStatus
187 -> ConfigData -> LuxiOp -> IO (ErrorResult JSValue)
188 handleCall _ _ cdata QueryClusterInfo =
189 let cluster = configCluster cdata
190 master = QCluster.clusterMasterNodeName cdata
191 hypervisors = clusterEnabledHypervisors cluster
192 diskTemplates = clusterEnabledDiskTemplates cluster
193 def_hv = case hypervisors of
194 x:_ -> showJSON x
195 [] -> JSNull
196 bits = show (Compat.finiteBitSize (0::Int)) ++ "bits"
197 arch_tuple = [bits, arch]
198 obj = [ ("software_version", showJSON C.releaseVersion)
199 , ("protocol_version", showJSON C.protocolVersion)
200 , ("config_version", showJSON C.configVersion)
201 , ("os_api_version", showJSON . maximum .
202 Set.toList . ConstantUtils.unFrozenSet $
203 C.osApiVersions)
204 , ("export_version", showJSON C.exportVersion)
205 , ("vcs_version", showJSON Version.version)
206 , ("architecture", showJSON arch_tuple)
207 , ("name", showJSON $ clusterClusterName cluster)
208 , ("master", showJSON (case master of
209 Ok name -> name
210 _ -> undefined))
211 , ("default_hypervisor", def_hv)
212 , ("enabled_hypervisors", showJSON hypervisors)
213 , ("hvparams", showJSON $ clusterHvparams cluster)
214 , ("os_hvp", showJSON $ clusterOsHvp cluster)
215 , ("beparams", showJSON $ clusterBeparams cluster)
216 , ("osparams", showJSON $ clusterOsparams cluster)
217 , ("ipolicy", showJSON $ clusterIpolicy cluster)
218 , ("nicparams", showJSON $ clusterNicparams cluster)
219 , ("ndparams", showJSON $ clusterNdparams cluster)
220 , ("diskparams", showJSON $ clusterDiskparams cluster)
221 , ("candidate_pool_size",
222 showJSON $ clusterCandidatePoolSize cluster)
223 , ("max_running_jobs",
224 showJSON $ clusterMaxRunningJobs cluster)
225 , ("max_tracked_jobs",
226 showJSON $ clusterMaxTrackedJobs cluster)
227 , ("mac_prefix", showJSON $ clusterMacPrefix cluster)
228 , ("master_netdev", showJSON $ clusterMasterNetdev cluster)
229 , ("master_netmask", showJSON $ clusterMasterNetmask cluster)
230 , ("use_external_mip_script",
231 showJSON $ clusterUseExternalMipScript cluster)
232 , ("volume_group_name",
233 maybe JSNull showJSON (clusterVolumeGroupName cluster))
234 , ("drbd_usermode_helper",
235 maybe JSNull showJSON (clusterDrbdUsermodeHelper cluster))
236 , ("file_storage_dir", showJSON $ clusterFileStorageDir cluster)
237 , ("shared_file_storage_dir",
238 showJSON $ clusterSharedFileStorageDir cluster)
239 , ("gluster_storage_dir",
240 showJSON $ clusterGlusterStorageDir cluster)
241 , ("maintain_node_health",
242 showJSON $ clusterMaintainNodeHealth cluster)
243 , ("ctime", showJSON . TimeAsDoubleJSON $ clusterCtime cluster)
244 , ("mtime", showJSON . TimeAsDoubleJSON $ clusterMtime cluster)
245 , ("uuid", showJSON $ clusterUuid cluster)
246 , ("tags", showJSON $ clusterTags cluster)
247 , ("uid_pool", showJSON $ clusterUidPool cluster)
248 , ("default_iallocator",
249 showJSON $ clusterDefaultIallocator cluster)
250 , ("default_iallocator_params",
251 showJSON $ clusterDefaultIallocatorParams cluster)
252 , ("reserved_lvs", showJSON $ clusterReservedLvs cluster)
253 , ("primary_ip_version",
254 showJSON . ipFamilyToVersion $ clusterPrimaryIpFamily cluster)
255 , ("prealloc_wipe_disks",
256 showJSON $ clusterPreallocWipeDisks cluster)
257 , ("hidden_os", showJSON $ clusterHiddenOs cluster)
258 , ("blacklisted_os", showJSON $ clusterBlacklistedOs cluster)
259 , ("enabled_disk_templates", showJSON diskTemplates)
260 , ("install_image", showJSON $ clusterInstallImage cluster)
261 , ("instance_communication_network",
262 showJSON (clusterInstanceCommunicationNetwork cluster))
263 , ("zeroing_image", showJSON $ clusterZeroingImage cluster)
264 , ("compression_tools",
265 showJSON $ clusterCompressionTools cluster)
266 , ("enabled_user_shutdown",
267 showJSON $ clusterEnabledUserShutdown cluster)
268 , ("enabled_data_collectors",
269 showJSON . fmap dataCollectorActive
270 $ clusterDataCollectors cluster)
271 , ("data_collector_interval",
272 showJSON . fmap dataCollectorInterval
273 $ clusterDataCollectors cluster)
274 ]
275
276 in case master of
277 Ok _ -> return . Ok . J.makeObj $ obj
278 Bad ex -> return $ Bad ex
279
280 handleCall _ _ cfg (QueryTags kind name) = do
281 let tags = case kind of
282 TagKindCluster -> Ok . clusterTags $ configCluster cfg
283 TagKindGroup -> groupTags <$> Config.getGroup cfg name
284 TagKindNode -> nodeTags <$> Config.getNode cfg name
285 TagKindInstance -> instTags <$> Config.getInstance cfg name
286 TagKindNetwork -> networkTags <$> Config.getNetwork cfg name
287 return (J.showJSON <$> tags)
288
289 handleCall _ _ cfg (Query qkind qfields qfilter) = do
290 result <- query cfg True (Qlang.Query qkind qfields qfilter)
291 return $ J.showJSON <$> result
292
293 handleCall _ _ _ (QueryFields qkind qfields) = do
294 let result = queryFields (Qlang.QueryFields qkind qfields)
295 return $ J.showJSON <$> result
296
297 handleCall _ _ cfg (QueryNodes names fields lock) =
298 handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRNode)
299 (map Left names) fields lock
300
301 handleCall _ _ cfg (QueryInstances names fields lock) =
302 handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRInstance)
303 (map Left names) fields lock
304
305 handleCall _ _ cfg (QueryGroups names fields lock) =
306 handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRGroup)
307 (map Left names) fields lock
308
309 handleCall _ _ cfg (QueryJobs names fields) =
310 handleClassicQuery cfg (Qlang.ItemTypeLuxi Qlang.QRJob)
311 (map (Right . fromIntegral . fromJobId) names) fields False
312
313 handleCall _ _ cfg (QueryFilters uuids fields) =
314 handleUuidQuery cfg (Qlang.ItemTypeLuxi Qlang.QRFilter)
315 (map Left uuids) fields False
316
317 handleCall _ status _ (ReplaceFilter mUuid priority predicates action
318 reason) =
319 -- Handles both adding new filter and changing existing ones.
320 runResultT $ do
321
322 -- Check that uuid `String` is actually a UUID.
323 uuid <- case mUuid of
324 Nothing -> liftIO newUUID -- Request to add a new filter
325 Just u -- Request to edit an existing filter
326 | isUUID u -> return u
327 | otherwise -> fail "Unable to parse UUID"
328
329 timestamp <- liftIO $ reasonTrailTimestamp <$> currentTimestamp
330 let luxidReason = ("luxid", "", timestamp)
331
332 -- Ask WConfd to change the config for us.
333 cid <- liftIO $ makeLuxidClientId status
334 withLockedWconfdConfig cid $ \lockedCfg -> do
335 -- Reading the latest JobID inside the Wconfd lock to really get the
336 -- most recent one (locking may block us for some time).
337 serial <- liftIO readSerialFromDisk
338 case serial of
339 Bad err -> fail $ "AddFilter: reading current JobId failed: " ++ err
340 Ok watermark -> do
341 let rule = FilterRule { frWatermark = watermark
342 , frPriority = priority
343 , frPredicates = predicates
344 , frAction = action
345 , frReasonTrail = reason ++ [luxidReason]
346 , frUuid = UTF8.fromString uuid
347 }
348 writeConfig cid
349 . (configFiltersL . alterContainerL uuid .~ Just rule)
350 $ lockedCfg
351
352 -- Return UUID of added/replaced filter.
353 return $ showJSON uuid
354
355 handleCall _ status cfg (DeleteFilter uuid) = runResultT $ do
356 -- Check if filter exists.
357 _ <- lookupContainer
358 (failError $ "Filter rule with UUID " ++ uuid ++ " does not exist")
359 uuid
360 (configFilters cfg)
361
362 -- Ask WConfd to change the config for us.
363 cid <- liftIO $ makeLuxidClientId status
364 withLockedWconfdConfig cid $ \lockedCfg ->
365 writeConfig cid
366 . (configFiltersL . alterContainerL uuid .~ Nothing)
367 $ lockedCfg
368
369 return JSNull
370
371 handleCall _ _ cfg (QueryNetworks names fields lock) =
372 handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRNetwork)
373 (map Left names) fields lock
374
375 handleCall _ _ cfg (QueryConfigValues fields) = do
376 let params = [ ("cluster_name", return . showJSON . clusterClusterName
377 . configCluster $ cfg)
378 , ("watcher_pause", liftM (maybe JSNull showJSON)
379 QCluster.isWatcherPaused)
380 , ("master_node", return . genericResult (const JSNull) showJSON
381 $ QCluster.clusterMasterNodeName cfg)
382 , ("drain_flag", liftM (showJSON . not) isQueueOpen)
383 ] :: [(String, IO JSValue)]
384 let answer = map (fromMaybe (return JSNull) . flip lookup params) fields
385 answerEval <- sequence answer
386 return . Ok . showJSON $ answerEval
387
388 handleCall _ _ cfg (QueryExports nodes lock) =
389 handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRExport)
390 (map Left nodes) ["node", "export"] lock
391
392 handleCall qlock qstat cfg (SubmitJobToDrainedQueue ops) = runResultT $ do
393 jid <- mkResultT $ allocateJobId (Config.getMasterCandidates cfg) qlock
394 ts <- liftIO currentTimestamp
395 job <- liftM (extendJobReasonTrail . setReceivedTimestamp ts)
396 $ queuedJobFromOpCodes jid ops
397 qDir <- liftIO queueDir
398 _ <- writeAndReplicateJob cfg qDir job
399 _ <- liftIO . forkIO $ enqueueNewJobs qstat [job]
400 return . showJSON . fromJobId $ jid
401
402 handleCall qlock qstat cfg (SubmitJob ops) =
403 do
404 open <- isQueueOpen
405 if not open
406 then return . Bad . GenericError $ "Queue drained"
407 else handleCall qlock qstat cfg (SubmitJobToDrainedQueue ops)
408
409 handleCall qlock qstat cfg (SubmitManyJobs lops) =
410 do
411 open <- isQueueOpen
412 if not open
413 then return . Bad . GenericError $ "Queue drained"
414 else do
415 let mcs = Config.getMasterCandidates cfg
416 result_jobids <- allocateJobIds mcs qlock (length lops)
417 case result_jobids of
418 Bad s -> return . Bad . GenericError $ s
419 Ok jids -> do
420 ts <- currentTimestamp
421 jobs <- liftM (map $ extendJobReasonTrail . setReceivedTimestamp ts)
422 $ zipWithM queuedJobFromOpCodes jids lops
423 qDir <- queueDir
424 write_results <- mapM (writeJobToDisk qDir) jobs
425 let annotated_results = zip write_results jobs
426 succeeded = map snd $ filter (isOk . fst) annotated_results
427 when (any isBad write_results) . logWarning
428 $ "Writing some jobs failed " ++ show annotated_results
429 replicateManyJobs qDir mcs succeeded
430 _ <- forkIO $ enqueueNewJobs qstat succeeded
431 return . Ok . JSArray
432 . map (\(res, job) ->
433 if isOk res
434 then showJSON (True, fromJobId $ qjId job)
435 else showJSON (False, genericResult id (const "") res))
436 $ annotated_results
437
438 handleCall _ _ cfg (WaitForJobChange jid fields prev_job prev_log tmout) = do
439 let compute_fn = computeJobUpdate cfg jid fields prev_log
440 qDir <- queueDir
441 -- verify if the job is finalized, and return immediately in this case
442 jobresult <- loadJobFromDisk qDir False jid
443 case jobresult of
444 Bad s -> return . Bad $ JobLost s
445 Ok (job, _) | not (jobFinalized job) -> do
446 let jobfile = liveJobFile qDir jid
447 answer <- watchFile jobfile (min tmout C.luxiWfjcTimeout)
448 (prev_job, JSArray []) compute_fn
449 return . Ok $ showJSON answer
450 _ -> liftM (Ok . showJSON) compute_fn
451
452 handleCall _ _ cfg (SetWatcherPause time) = do
453 let mcs = Config.getMasterOrCandidates cfg
454 _ <- executeRpcCall mcs $ RpcCallSetWatcherPause time
455 return . Ok . maybe JSNull showJSON $ fmap TimeAsDoubleJSON time
456
457 handleCall _ _ cfg (SetDrainFlag value) = do
458 let mcs = Config.getMasterCandidates cfg
459 fpath <- jobQueueDrainFile
460 if value
461 then writeFile fpath ""
462 else removeFile fpath
463 _ <- executeRpcCall mcs $ RpcCallSetDrainFlag value
464 return . Ok . showJSON $ True
465
466 handleCall _ qstat cfg (ChangeJobPriority jid prio) = do
467 let jName = (++) "job " . show $ fromJobId jid
468 maybeJob <- setJobPriority qstat jid prio
469 case maybeJob of
470 Bad s -> return . Ok $ showJSON (False, s)
471 Ok (Just job) -> runResultT $ do
472 let mcs = Config.getMasterCandidates cfg
473 qDir <- liftIO queueDir
474 liftIO $ replicateManyJobs qDir mcs [job]
475 return $ showJSON (True, "Priorities of pending opcodes for "
476 ++ jName ++ " have been changed"
477 ++ " to " ++ show prio)
478 Ok Nothing -> do
479 logDebug $ jName ++ " started, will signal"
480 fmap showJSON <$> tellJobPriority (jqLivelock qstat) jid prio
481
482 handleCall _ qstat cfg (CancelJob jid kill) = do
483 let jName = (++) "job " . show $ fromJobId jid
484 dequeueResult <- dequeueJob qstat jid
485 case dequeueResult of
486 Ok True ->
487 let jobFileFailed = (,) False
488 . (++) ("Dequeued " ++ jName
489 ++ ", but failed to mark as cancelled: ")
490 jobFileSucceeded _ = (True, "Dequeued " ++ jName)
491 in liftM (Ok . showJSON . genericResult jobFileFailed jobFileSucceeded)
492 . runResultT $ do
493 logDebug $ jName ++ " dequeued, marking as canceled"
494 qDir <- liftIO queueDir
495 (job, _) <- ResultT $ loadJobFromDisk qDir True jid
496 now <- liftIO currentTimestamp
497 let job' = cancelQueuedJob now job
498 writeAndReplicateJob cfg qDir job'
499 Ok False -> do
500 logDebug $ jName ++ " not queued; trying to cancel directly"
501 result <- fmap showJSON <$> cancelJob kill (jqLivelock qstat) jid
502 when kill . void . forkIO $ do
503 _ <- orM
504 . intersperse (threadDelay C.luxidJobDeathDelay >> return False)
505 . replicate C.luxidJobDeathDetectionRetries
506 $ cleanupIfDead qstat jid
507 wconfdsocket <- Path.defaultWConfdSocket
508 wconfdclient <- getWConfdClient wconfdsocket
509 void . runResultT $ runRpcClient cleanupLocks wconfdclient
510 return result
511 Bad s -> return . Ok . showJSON $ (False, s)
512
513 handleCall qlock _ cfg (ArchiveJob jid) =
514 -- By adding a layer of MaybeT, we can prematurely end a computation
515 -- using 'mzero' or other 'MonadPlus' primitive and return 'Ok False'.
516 runResultT . liftM (showJSON . fromMaybe False) . runMaybeT $ do
517 qDir <- liftIO queueDir
518 let mcs = Config.getMasterCandidates cfg
519 live = liveJobFile qDir jid
520 archive = archivedJobFile qDir jid
521 withLock qlock $ do
522 (job, _) <- (lift . mkResultT $ loadJobFromDisk qDir False jid)
523 `orElse` mzero
524 guard $ jobFinalized job
525 lift . withErrorT JobQueueError
526 . annotateError "Archiving failed in an unexpected way"
527 . mkResultT $ safeRenameFile queueDirPermissions live archive
528 _ <- liftIO . executeRpcCall mcs
529 $ RpcCallJobqueueRename [(live, archive)]
530 return True
531
532 handleCall qlock _ cfg (AutoArchiveJobs age timeout) = do
533 qDir <- queueDir
534 resultJids <- getJobIDs [qDir]
535 case resultJids of
536 Bad s -> return . Bad . JobQueueError $ show s
537 Ok jids -> do
538 result <- withLock qlock
539 . archiveJobs cfg age timeout
540 $ sortJobIDs jids
541 return . Ok $ showJSON result
542
543 handleCall _ _ _ (PickupJob _) =
544 return . Bad
545 $ GenericError "Luxi call 'PickupJob' is for internal use only"
546
547 {-# ANN handleCall "HLint: ignore Too strict if" #-}
548
549 -- | Query the status of a job and return the requested fields
550 -- and the logs newer than the given log number.
551 computeJobUpdate :: ConfigData -> JobId -> [String] -> JSValue
552 -> IO (JSValue, JSValue)
553 computeJobUpdate cfg jid fields prev_log = do
554 let sjid = show $ fromJobId jid
555 logDebug $ "Inspecting fields " ++ show fields ++ " of job " ++ sjid
556 let fromJSArray (JSArray xs) = xs
557 fromJSArray _ = []
558 let logFilter JSNull (JSArray _) = True
559 logFilter (JSRational _ n) (JSArray (JSRational _ m:_)) = n < m
560 logFilter _ _ = False
561 let filterLogs n logs = JSArray (filter (logFilter n) (logs >>= fromJSArray))
562 jobQuery <- handleClassicQuery cfg (Qlang.ItemTypeLuxi Qlang.QRJob)
563 [Right . fromIntegral $ fromJobId jid] ("oplog" : fields) False
564 let (rfields, rlogs) = case jobQuery of
565 Ok (JSArray [JSArray (JSArray logs : answer)]) ->
566 (answer, filterLogs prev_log logs)
567 _ -> (map (const JSNull) fields, JSArray [])
568 logDebug $ "Updates for job " ++ sjid ++ " are " ++ encode (rfields, rlogs)
569 return (JSArray rfields, rlogs)
570
571
572 type LuxiConfig = (Lock, JQStatus, ConfigReader)
573
574 luxiExec
575 :: LuxiConfig
576 -> LuxiOp
577 -> IO (Bool, GenericResult GanetiException JSValue)
578 luxiExec (qlock, qstat, creader) args = do
579 cfg <- creader
580 result <- handleCallWrapper qlock qstat cfg args
581 return (True, result)
582
583 luxiHandler :: LuxiConfig -> U.Handler LuxiOp IO JSValue
584 luxiHandler cfg = U.Handler { U.hParse = decodeLuxiCall
585 , U.hInputLogShort = strOfOp
586 , U.hInputLogLong = show
587 , U.hExec = luxiExec cfg
588 }
589
590 -- | Type alias for prepMain results
591 type PrepResult = (Server, IORef (Result ConfigData), JQStatus)
592
593 -- | Activate the master IP address.
594 activateMasterIP :: IO (Result ())
595 activateMasterIP = runResultT $ do
596 liftIO $ logDebug "Activating master IP address"
597 conf_file <- liftIO Path.clusterConfFile
598 config <- mkResultT $ Config.loadConfig conf_file
599 let mnp = Config.getMasterNetworkParameters config
600 masters = Config.getMasterNodes config
601 ems = clusterUseExternalMipScript $ configCluster config
602 liftIO . logDebug $ "Master IP params: " ++ show mnp
603 res <- liftIO . executeRpcCall masters $ RpcCallNodeActivateMasterIp mnp ems
604 _ <- liftIO $ logRpcErrors res
605 liftIO $ logDebug "finished activating master IP address"
606 return ()
607
608 -- | Check function for luxid.
609 checkMain :: CheckFn ()
610 checkMain = handleMasterVerificationOptions
611
612 -- | Prepare function for luxid.
613 prepMain :: PrepFn () PrepResult
614 prepMain _ _ = do
615 Exec.isForkSupported
616 >>= flip exitUnless "The daemon must be compiled without -threaded"
617
618 socket_path <- Path.defaultQuerySocket
619 cleanupSocket socket_path
620 s <- describeError "binding to the Luxi socket"
621 Nothing (Just socket_path) $ getLuxiServer True socket_path
622 cref <- newIORef (Bad "Configuration not yet loaded")
623 jq <- emptyJQStatus cref
624 return (s, cref, jq)
625
626 -- | Main function.
627 main :: MainFn () PrepResult
628 main _ _ (server, cref, jq) = do
629 -- Subscribe to config udpates. If the config changes, write new config and
630 -- check if the changes should trigger the scheduler.
631 initConfigReader $ \newConfig -> do
632
633 runScheduler <- atomicModifyIORef cref $ \oldConfig ->
634 case (oldConfig, newConfig) of
635 (Ok old, Ok new) -> (newConfig, configChangeNeedsRescheduling old new)
636 _ -> (newConfig, True) -- no old or new config, schedule
637
638 when runScheduler (scheduleSomeJobs jq)
639
640 let creader = readIORef cref
641
642 qlockFile <- jobQueueLockFile
643 _ <- lockFile qlockFile >>= exitIfBad "Failed to obtain the job-queue lock"
644 qlock <- newLock
645
646 _ <- P.installHandler P.sigCHLD P.Ignore Nothing
647
648 _ <- forkIO . void $ activateMasterIP
649
650 initJQScheduler jq
651
652 finally
653 (forever $ U.listener (luxiHandler (qlock, jq, creader)) server)
654 (closeServer server >> removeFile qlockFile)