Store keys as ByteStrings
[ganeti-github.git] / src / Ganeti / Query / Server.hs
1 {-# LANGUAGE FlexibleContexts #-}
2
3 {-| Implementation of the Ganeti Query2 server.
4
5 -}
6
7 {-
8
9 Copyright (C) 2012, 2013, 2014 Google Inc.
10 All rights reserved.
11
12 Redistribution and use in source and binary forms, with or without
13 modification, are permitted provided that the following conditions are
14 met:
15
16 1. Redistributions of source code must retain the above copyright notice,
17 this list of conditions and the following disclaimer.
18
19 2. Redistributions in binary form must reproduce the above copyright
20 notice, this list of conditions and the following disclaimer in the
21 documentation and/or other materials provided with the distribution.
22
23 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
24 IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
25 TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
26 PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
27 CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
28 EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
29 PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
30 PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
31 LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
32 NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
33 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
34
35 -}
36
37 module Ganeti.Query.Server
38 ( main
39 , checkMain
40 , prepMain
41 ) where
42
43 import Control.Applicative
44 import Control.Concurrent
45 import Control.Exception
46 import Control.Lens ((.~))
47 import Control.Monad (forever, when, mzero, guard, zipWithM, liftM, void)
48 import Control.Monad.Base (MonadBase, liftBase)
49 import Control.Monad.Error (MonadError)
50 import Control.Monad.IO.Class
51 import Control.Monad.Trans (lift)
52 import Control.Monad.Trans.Maybe
53 import qualified Data.ByteString.UTF8 as UTF8
54 import qualified Data.Set as Set (toList)
55 import Data.IORef
56 import Data.List (intersperse)
57 import Data.Maybe (fromMaybe)
58 import qualified Text.JSON as J
59 import Text.JSON (encode, showJSON, JSValue(..))
60 import System.Info (arch)
61 import System.Directory
62 import System.Posix.Process (getProcessID)
63 import System.Posix.Signals as P
64
65 import qualified Ganeti.Constants as C
66 import qualified Ganeti.ConstantUtils as ConstantUtils (unFrozenSet)
67 import Ganeti.Errors
68 import qualified Ganeti.Path as Path
69 import Ganeti.Daemon
70 import Ganeti.Daemon.Utils (handleMasterVerificationOptions)
71 import Ganeti.Objects
72 import Ganeti.Objects.Lens (configFiltersL)
73 import qualified Ganeti.Config as Config
74 import qualified Ganeti.Compat as Compat
75 import Ganeti.ConfigReader
76 import Ganeti.BasicTypes
77 import Ganeti.JQueue
78 import Ganeti.JQScheduler
79 import Ganeti.JSON (TimeAsDoubleJSON(..), alterContainerL, lookupContainer)
80 import Ganeti.Locking.Locks (ClientId(..), ClientType(ClientOther))
81 import Ganeti.Logging
82 import Ganeti.Luxi
83 import qualified Ganeti.Query.Language as Qlang
84 import qualified Ganeti.Query.Cluster as QCluster
85 import Ganeti.Path ( queueDir, jobQueueLockFile, jobQueueDrainFile )
86 import Ganeti.Rpc
87 import qualified Ganeti.Query.Exec as Exec
88 import Ganeti.Query.Query
89 import Ganeti.Query.Filter (makeSimpleFilter)
90 import Ganeti.THH.HsRPC (runRpcClient, RpcClientMonad)
91 import Ganeti.Types
92 import qualified Ganeti.UDSServer as U (Handler(..), listener)
93 import Ganeti.Utils ( lockFile, exitIfBad, exitUnless, watchFile
94 , safeRenameFile, newUUID, isUUID )
95 import Ganeti.Utils.Monad (orM)
96 import Ganeti.Utils.MVarLock
97 import qualified Ganeti.Version as Version
98 import Ganeti.WConfd.Client ( getWConfdClient, withLockedConfig, writeConfig
99 , cleanupLocks)
100
101
102 -- | Creates a `ClientId` that identifies the current luxi
103 -- (process, thread).
104 --
105 -- This means that this `ClientId` will be different for each request
106 -- handled by luxid.
107 makeLuxidClientId :: JQStatus -> IO ClientId
108 makeLuxidClientId status = do
109 pid <- getProcessID
110 tid <- myThreadId
111 return ClientId
112 { ciIdentifier = ClientOther $ "luxid-" ++ show tid
113 , ciLockFile = jqLivelock status
114 , ciPid = pid
115 }
116
117 -- | Creates a connection to WConfd and locks the config, allowing
118 -- to run some WConfd RPC commands given the locked config.
119 --
120 -- This is needed when luxid wants to change the config.
121 --
122 -- Example:
123 --
124 -- > cid <- makeLuxidClientId ...
125 -- > withLockedWconfdConfig cid $ \lockedCfg -> do
126 -- > -- some (IO) action that needs to be run inside having the lock
127 -- > writeConfig cid (updateConfig lockedCfg)
128 withLockedWconfdConfig
129 :: (MonadBase IO m, MonadError GanetiException m)
130 => ClientId
131 -> (ConfigData -> RpcClientMonad a)
132 -> m a
133 withLockedWconfdConfig cid f = do
134 wconfdClient <- liftBase $ getWConfdClient =<< Path.defaultWConfdSocket
135 runRpcClient (withLockedConfig cid False f) wconfdClient
136
137 -- | Helper for classic queries.
138 handleQuery :: [Qlang.ItemType -> Qlang.FilterField] -- ^ Fields to put into
139 -- the query
140 -> ConfigData -- ^ Cluster config
141 -> Qlang.ItemType -- ^ Query type
142 -> [Either String Integer] -- ^ Requested names
143 -- (empty means all)
144 -> [String] -- ^ Requested fields
145 -> Bool -- ^ Whether to do sync queries or not
146 -> IO (GenericResult GanetiException JSValue)
147 handleQuery _ _ _ _ _ True =
148 return . Bad $ OpPrereqError "Sync queries are not allowed" ECodeInval
149 handleQuery filterFields cfg qkind names fields _ = do
150 let simpleNameFilter field = makeSimpleFilter (field qkind) names
151 flt = Qlang.OrFilter $ map simpleNameFilter filterFields
152 qr <- query cfg True (Qlang.Query qkind fields flt)
153 return $ showJSON <$> (qr >>= queryCompat)
154
155 -- | Helper for classic queries.
156 -- Queries `name` and `uuid` fields.
157 handleClassicQuery :: ConfigData -- ^ Cluster config
158 -> Qlang.ItemType -- ^ Query type
159 -> [Either String Integer] -- ^ Requested names
160 -- (empty means all)
161 -> [String] -- ^ Requested fields
162 -> Bool -- ^ Whether to do sync queries or not
163 -> IO (GenericResult GanetiException JSValue)
164 handleClassicQuery = handleQuery [nameField, uuidField]
165
166 -- | Like `handleClassicQuery`, but filters only by UUID.
167 handleUuidQuery :: ConfigData -- ^ Cluster config
168 -> Qlang.ItemType -- ^ Query type
169 -> [Either String Integer] -- ^ Requested names
170 -- (empty means all)
171 -> [String] -- ^ Requested fields
172 -> Bool -- ^ Whether to do sync queries or not
173 -> IO (GenericResult GanetiException JSValue)
174 handleUuidQuery = handleQuery [uuidField]
175
176 -- | Minimal wrapper to handle the missing config case.
177 handleCallWrapper :: Lock -> JQStatus -> Result ConfigData
178 -> LuxiOp -> IO (ErrorResult JSValue)
179 handleCallWrapper _ _ (Bad msg) _ =
180 return . Bad . ConfigurationError $
181 "I do not have access to a valid configuration, cannot\
182 \ process queries: " ++ msg
183 handleCallWrapper qlock qstat (Ok config) op = handleCall qlock qstat config op
184
185 -- | Actual luxi operation handler.
186 handleCall :: Lock -> JQStatus
187 -> ConfigData -> LuxiOp -> IO (ErrorResult JSValue)
188 handleCall _ _ cdata QueryClusterInfo =
189 let cluster = configCluster cdata
190 master = QCluster.clusterMasterNodeName cdata
191 hypervisors = clusterEnabledHypervisors cluster
192 diskTemplates = clusterEnabledDiskTemplates cluster
193 def_hv = case hypervisors of
194 x:_ -> showJSON x
195 [] -> JSNull
196 bits = show (Compat.finiteBitSize (0::Int)) ++ "bits"
197 arch_tuple = [bits, arch]
198 obj = [ ("software_version", showJSON C.releaseVersion)
199 , ("protocol_version", showJSON C.protocolVersion)
200 , ("config_version", showJSON C.configVersion)
201 , ("os_api_version", showJSON . maximum .
202 Set.toList . ConstantUtils.unFrozenSet $
203 C.osApiVersions)
204 , ("export_version", showJSON C.exportVersion)
205 , ("vcs_version", showJSON Version.version)
206 , ("architecture", showJSON arch_tuple)
207 , ("name", showJSON $ clusterClusterName cluster)
208 , ("master", showJSON (case master of
209 Ok name -> name
210 _ -> undefined))
211 , ("default_hypervisor", def_hv)
212 , ("enabled_hypervisors", showJSON hypervisors)
213 , ("hvparams", showJSON $ clusterHvparams cluster)
214 , ("os_hvp", showJSON $ clusterOsHvp cluster)
215 , ("beparams", showJSON $ clusterBeparams cluster)
216 , ("osparams", showJSON $ clusterOsparams cluster)
217 , ("ipolicy", showJSON $ clusterIpolicy cluster)
218 , ("nicparams", showJSON $ clusterNicparams cluster)
219 , ("ndparams", showJSON $ clusterNdparams cluster)
220 , ("diskparams", showJSON $ clusterDiskparams cluster)
221 , ("candidate_pool_size",
222 showJSON $ clusterCandidatePoolSize cluster)
223 , ("max_running_jobs",
224 showJSON $ clusterMaxRunningJobs cluster)
225 , ("max_tracked_jobs",
226 showJSON $ clusterMaxTrackedJobs cluster)
227 , ("mac_prefix", showJSON $ clusterMacPrefix cluster)
228 , ("master_netdev", showJSON $ clusterMasterNetdev cluster)
229 , ("master_netmask", showJSON $ clusterMasterNetmask cluster)
230 , ("use_external_mip_script",
231 showJSON $ clusterUseExternalMipScript cluster)
232 , ("volume_group_name",
233 maybe JSNull showJSON (clusterVolumeGroupName cluster))
234 , ("drbd_usermode_helper",
235 maybe JSNull showJSON (clusterDrbdUsermodeHelper cluster))
236 , ("file_storage_dir", showJSON $ clusterFileStorageDir cluster)
237 , ("shared_file_storage_dir",
238 showJSON $ clusterSharedFileStorageDir cluster)
239 , ("gluster_storage_dir",
240 showJSON $ clusterGlusterStorageDir cluster)
241 , ("maintain_node_health",
242 showJSON $ clusterMaintainNodeHealth cluster)
243 , ("ctime", showJSON . TimeAsDoubleJSON $ clusterCtime cluster)
244 , ("mtime", showJSON . TimeAsDoubleJSON $ clusterMtime cluster)
245 , ("uuid", showJSON $ clusterUuid cluster)
246 , ("tags", showJSON $ clusterTags cluster)
247 , ("uid_pool", showJSON $ clusterUidPool cluster)
248 , ("default_iallocator",
249 showJSON $ clusterDefaultIallocator cluster)
250 , ("default_iallocator_params",
251 showJSON $ clusterDefaultIallocatorParams cluster)
252 , ("reserved_lvs", showJSON $ clusterReservedLvs cluster)
253 , ("primary_ip_version",
254 showJSON . ipFamilyToVersion $ clusterPrimaryIpFamily cluster)
255 , ("prealloc_wipe_disks",
256 showJSON $ clusterPreallocWipeDisks cluster)
257 , ("hidden_os", showJSON $ clusterHiddenOs cluster)
258 , ("blacklisted_os", showJSON $ clusterBlacklistedOs cluster)
259 , ("enabled_disk_templates", showJSON diskTemplates)
260 , ("install_image", showJSON $ clusterInstallImage cluster)
261 , ("instance_communication_network",
262 showJSON (clusterInstanceCommunicationNetwork cluster))
263 , ("zeroing_image", showJSON $ clusterZeroingImage cluster)
264 , ("compression_tools",
265 showJSON $ clusterCompressionTools cluster)
266 , ("enabled_user_shutdown",
267 showJSON $ clusterEnabledUserShutdown cluster)
268 , ("enabled_data_collectors",
269 showJSON . fmap dataCollectorActive
270 $ clusterDataCollectors cluster)
271 , ("data_collector_interval",
272 showJSON . fmap dataCollectorInterval
273 $ clusterDataCollectors cluster)
274 ]
275
276 in case master of
277 Ok _ -> return . Ok . J.makeObj $ obj
278 Bad ex -> return $ Bad ex
279
280 handleCall _ _ cfg (QueryTags kind name) = do
281 let tags = case kind of
282 TagKindCluster -> Ok . clusterTags $ configCluster cfg
283 TagKindGroup -> groupTags <$> Config.getGroup cfg name
284 TagKindNode -> nodeTags <$> Config.getNode cfg name
285 TagKindInstance -> instTags <$> Config.getInstance cfg name
286 TagKindNetwork -> networkTags <$> Config.getNetwork cfg name
287 return (J.showJSON <$> tags)
288
289 handleCall _ _ cfg (Query qkind qfields qfilter) = do
290 result <- query cfg True (Qlang.Query qkind qfields qfilter)
291 return $ J.showJSON <$> result
292
293 handleCall _ _ _ (QueryFields qkind qfields) = do
294 let result = queryFields (Qlang.QueryFields qkind qfields)
295 return $ J.showJSON <$> result
296
297 handleCall _ _ cfg (QueryNodes names fields lock) =
298 handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRNode)
299 (map Left names) fields lock
300
301 handleCall _ _ cfg (QueryInstances names fields lock) =
302 handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRInstance)
303 (map Left names) fields lock
304
305 handleCall _ _ cfg (QueryGroups names fields lock) =
306 handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRGroup)
307 (map Left names) fields lock
308
309 handleCall _ _ cfg (QueryJobs names fields) =
310 handleClassicQuery cfg (Qlang.ItemTypeLuxi Qlang.QRJob)
311 (map (Right . fromIntegral . fromJobId) names) fields False
312
313 handleCall _ _ cfg (QueryFilters uuids fields) =
314 handleUuidQuery cfg (Qlang.ItemTypeLuxi Qlang.QRFilter)
315 (map Left uuids) fields False
316
317 handleCall _ status _ (ReplaceFilter mUuid priority predicates action
318 reason) =
319 -- Handles both adding new filter and changing existing ones.
320 runResultT $ do
321
322 -- Check that uuid `String` is actually a UUID.
323 uuid <- case mUuid of
324 Nothing -> liftIO newUUID -- Request to add a new filter
325 Just u -- Request to edit an existing filter
326 | isUUID u -> return u
327 | otherwise -> fail "Unable to parse UUID"
328
329 timestamp <- liftIO $ reasonTrailTimestamp <$> currentTimestamp
330 let luxidReason = ("luxid", "", timestamp)
331
332 -- Ask WConfd to change the config for us.
333 cid <- liftIO $ makeLuxidClientId status
334 withLockedWconfdConfig cid $ \lockedCfg -> do
335 -- Reading the latest JobID inside the Wconfd lock to really get the
336 -- most recent one (locking may block us for some time).
337 serial <- liftIO readSerialFromDisk
338 case serial of
339 Bad err -> fail $ "AddFilter: reading current JobId failed: " ++ err
340 Ok watermark -> do
341 let rule = FilterRule { frWatermark = watermark
342 , frPriority = priority
343 , frPredicates = predicates
344 , frAction = action
345 , frReasonTrail = reason ++ [luxidReason]
346 , frUuid = UTF8.fromString uuid
347 }
348 writeConfig cid
349 . (configFiltersL . alterContainerL (UTF8.fromString uuid)
350 .~ Just rule)
351 $ lockedCfg
352
353 -- Return UUID of added/replaced filter.
354 return $ showJSON uuid
355
356 handleCall _ status cfg (DeleteFilter uuid) = runResultT $ do
357 -- Check if filter exists.
358 _ <- lookupContainer
359 (failError $ "Filter rule with UUID " ++ uuid ++ " does not exist")
360 (UTF8.fromString uuid)
361 (configFilters cfg)
362
363 -- Ask WConfd to change the config for us.
364 cid <- liftIO $ makeLuxidClientId status
365 withLockedWconfdConfig cid $ \lockedCfg ->
366 writeConfig cid
367 . (configFiltersL . alterContainerL (UTF8.fromString uuid) .~ Nothing)
368 $ lockedCfg
369
370 return JSNull
371
372 handleCall _ _ cfg (QueryNetworks names fields lock) =
373 handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRNetwork)
374 (map Left names) fields lock
375
376 handleCall _ _ cfg (QueryConfigValues fields) = do
377 let params = [ ("cluster_name", return . showJSON . clusterClusterName
378 . configCluster $ cfg)
379 , ("watcher_pause", liftM (maybe JSNull showJSON)
380 QCluster.isWatcherPaused)
381 , ("master_node", return . genericResult (const JSNull) showJSON
382 $ QCluster.clusterMasterNodeName cfg)
383 , ("drain_flag", liftM (showJSON . not) isQueueOpen)
384 ] :: [(String, IO JSValue)]
385 let answer = map (fromMaybe (return JSNull) . flip lookup params) fields
386 answerEval <- sequence answer
387 return . Ok . showJSON $ answerEval
388
389 handleCall _ _ cfg (QueryExports nodes lock) =
390 handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRExport)
391 (map Left nodes) ["node", "export"] lock
392
393 handleCall qlock qstat cfg (SubmitJobToDrainedQueue ops) = runResultT $ do
394 jid <- mkResultT $ allocateJobId (Config.getMasterCandidates cfg) qlock
395 ts <- liftIO currentTimestamp
396 job <- liftM (extendJobReasonTrail . setReceivedTimestamp ts)
397 $ queuedJobFromOpCodes jid ops
398 qDir <- liftIO queueDir
399 _ <- writeAndReplicateJob cfg qDir job
400 _ <- liftIO . forkIO $ enqueueNewJobs qstat [job]
401 return . showJSON . fromJobId $ jid
402
403 handleCall qlock qstat cfg (SubmitJob ops) =
404 do
405 open <- isQueueOpen
406 if not open
407 then return . Bad . GenericError $ "Queue drained"
408 else handleCall qlock qstat cfg (SubmitJobToDrainedQueue ops)
409
410 handleCall qlock qstat cfg (SubmitManyJobs lops) =
411 do
412 open <- isQueueOpen
413 if not open
414 then return . Bad . GenericError $ "Queue drained"
415 else do
416 let mcs = Config.getMasterCandidates cfg
417 result_jobids <- allocateJobIds mcs qlock (length lops)
418 case result_jobids of
419 Bad s -> return . Bad . GenericError $ s
420 Ok jids -> do
421 ts <- currentTimestamp
422 jobs <- liftM (map $ extendJobReasonTrail . setReceivedTimestamp ts)
423 $ zipWithM queuedJobFromOpCodes jids lops
424 qDir <- queueDir
425 write_results <- mapM (writeJobToDisk qDir) jobs
426 let annotated_results = zip write_results jobs
427 succeeded = map snd $ filter (isOk . fst) annotated_results
428 when (any isBad write_results) . logWarning
429 $ "Writing some jobs failed " ++ show annotated_results
430 replicateManyJobs qDir mcs succeeded
431 _ <- forkIO $ enqueueNewJobs qstat succeeded
432 return . Ok . JSArray
433 . map (\(res, job) ->
434 if isOk res
435 then showJSON (True, fromJobId $ qjId job)
436 else showJSON (False, genericResult id (const "") res))
437 $ annotated_results
438
439 handleCall _ _ cfg (WaitForJobChange jid fields prev_job prev_log tmout) = do
440 let compute_fn = computeJobUpdate cfg jid fields prev_log
441 qDir <- queueDir
442 -- verify if the job is finalized, and return immediately in this case
443 jobresult <- loadJobFromDisk qDir False jid
444 case jobresult of
445 Bad s -> return . Bad $ JobLost s
446 Ok (job, _) | not (jobFinalized job) -> do
447 let jobfile = liveJobFile qDir jid
448 answer <- watchFile jobfile (min tmout C.luxiWfjcTimeout)
449 (prev_job, JSArray []) compute_fn
450 return . Ok $ showJSON answer
451 _ -> liftM (Ok . showJSON) compute_fn
452
453 handleCall _ _ cfg (SetWatcherPause time) = do
454 let mcs = Config.getMasterOrCandidates cfg
455 _ <- executeRpcCall mcs $ RpcCallSetWatcherPause time
456 return . Ok . maybe JSNull showJSON $ fmap TimeAsDoubleJSON time
457
458 handleCall _ _ cfg (SetDrainFlag value) = do
459 let mcs = Config.getMasterCandidates cfg
460 fpath <- jobQueueDrainFile
461 if value
462 then writeFile fpath ""
463 else removeFile fpath
464 _ <- executeRpcCall mcs $ RpcCallSetDrainFlag value
465 return . Ok . showJSON $ True
466
467 handleCall _ qstat cfg (ChangeJobPriority jid prio) = do
468 let jName = (++) "job " . show $ fromJobId jid
469 maybeJob <- setJobPriority qstat jid prio
470 case maybeJob of
471 Bad s -> return . Ok $ showJSON (False, s)
472 Ok (Just job) -> runResultT $ do
473 let mcs = Config.getMasterCandidates cfg
474 qDir <- liftIO queueDir
475 liftIO $ replicateManyJobs qDir mcs [job]
476 return $ showJSON (True, "Priorities of pending opcodes for "
477 ++ jName ++ " have been changed"
478 ++ " to " ++ show prio)
479 Ok Nothing -> do
480 logDebug $ jName ++ " started, will signal"
481 fmap showJSON <$> tellJobPriority (jqLivelock qstat) jid prio
482
483 handleCall _ qstat cfg (CancelJob jid kill) = do
484 let jName = (++) "job " . show $ fromJobId jid
485 dequeueResult <- dequeueJob qstat jid
486 case dequeueResult of
487 Ok True ->
488 let jobFileFailed = (,) False
489 . (++) ("Dequeued " ++ jName
490 ++ ", but failed to mark as cancelled: ")
491 jobFileSucceeded _ = (True, "Dequeued " ++ jName)
492 in liftM (Ok . showJSON . genericResult jobFileFailed jobFileSucceeded)
493 . runResultT $ do
494 logDebug $ jName ++ " dequeued, marking as canceled"
495 qDir <- liftIO queueDir
496 (job, _) <- ResultT $ loadJobFromDisk qDir True jid
497 now <- liftIO currentTimestamp
498 let job' = cancelQueuedJob now job
499 writeAndReplicateJob cfg qDir job'
500 Ok False -> do
501 logDebug $ jName ++ " not queued; trying to cancel directly"
502 result <- fmap showJSON <$> cancelJob kill (jqLivelock qstat) jid
503 when kill . void . forkIO $ do
504 _ <- orM
505 . intersperse (threadDelay C.luxidJobDeathDelay >> return False)
506 . replicate C.luxidJobDeathDetectionRetries
507 $ cleanupIfDead qstat jid
508 wconfdsocket <- Path.defaultWConfdSocket
509 wconfdclient <- getWConfdClient wconfdsocket
510 void . runResultT $ runRpcClient cleanupLocks wconfdclient
511 return result
512 Bad s -> return . Ok . showJSON $ (False, s)
513
514 handleCall qlock _ cfg (ArchiveJob jid) =
515 -- By adding a layer of MaybeT, we can prematurely end a computation
516 -- using 'mzero' or other 'MonadPlus' primitive and return 'Ok False'.
517 runResultT . liftM (showJSON . fromMaybe False) . runMaybeT $ do
518 qDir <- liftIO queueDir
519 let mcs = Config.getMasterCandidates cfg
520 live = liveJobFile qDir jid
521 archive = archivedJobFile qDir jid
522 withLock qlock $ do
523 (job, _) <- (lift . mkResultT $ loadJobFromDisk qDir False jid)
524 `orElse` mzero
525 guard $ jobFinalized job
526 lift . withErrorT JobQueueError
527 . annotateError "Archiving failed in an unexpected way"
528 . mkResultT $ safeRenameFile queueDirPermissions live archive
529 _ <- liftIO . executeRpcCall mcs
530 $ RpcCallJobqueueRename [(live, archive)]
531 return True
532
533 handleCall qlock _ cfg (AutoArchiveJobs age timeout) = do
534 qDir <- queueDir
535 resultJids <- getJobIDs [qDir]
536 case resultJids of
537 Bad s -> return . Bad . JobQueueError $ show s
538 Ok jids -> do
539 result <- withLock qlock
540 . archiveJobs cfg age timeout
541 $ sortJobIDs jids
542 return . Ok $ showJSON result
543
544 handleCall _ _ _ (PickupJob _) =
545 return . Bad
546 $ GenericError "Luxi call 'PickupJob' is for internal use only"
547
548 {-# ANN handleCall "HLint: ignore Too strict if" #-}
549
550 -- | Query the status of a job and return the requested fields
551 -- and the logs newer than the given log number.
552 computeJobUpdate :: ConfigData -> JobId -> [String] -> JSValue
553 -> IO (JSValue, JSValue)
554 computeJobUpdate cfg jid fields prev_log = do
555 let sjid = show $ fromJobId jid
556 logDebug $ "Inspecting fields " ++ show fields ++ " of job " ++ sjid
557 let fromJSArray (JSArray xs) = xs
558 fromJSArray _ = []
559 let logFilter JSNull (JSArray _) = True
560 logFilter (JSRational _ n) (JSArray (JSRational _ m:_)) = n < m
561 logFilter _ _ = False
562 let filterLogs n logs = JSArray (filter (logFilter n) (logs >>= fromJSArray))
563 jobQuery <- handleClassicQuery cfg (Qlang.ItemTypeLuxi Qlang.QRJob)
564 [Right . fromIntegral $ fromJobId jid] ("oplog" : fields) False
565 let (rfields, rlogs) = case jobQuery of
566 Ok (JSArray [JSArray (JSArray logs : answer)]) ->
567 (answer, filterLogs prev_log logs)
568 _ -> (map (const JSNull) fields, JSArray [])
569 logDebug $ "Updates for job " ++ sjid ++ " are " ++ encode (rfields, rlogs)
570 return (JSArray rfields, rlogs)
571
572
573 type LuxiConfig = (Lock, JQStatus, ConfigReader)
574
575 luxiExec
576 :: LuxiConfig
577 -> LuxiOp
578 -> IO (Bool, GenericResult GanetiException JSValue)
579 luxiExec (qlock, qstat, creader) args = do
580 cfg <- creader
581 result <- handleCallWrapper qlock qstat cfg args
582 return (True, result)
583
584 luxiHandler :: LuxiConfig -> U.Handler LuxiOp IO JSValue
585 luxiHandler cfg = U.Handler { U.hParse = decodeLuxiCall
586 , U.hInputLogShort = strOfOp
587 , U.hInputLogLong = show
588 , U.hExec = luxiExec cfg
589 }
590
591 -- | Type alias for prepMain results
592 type PrepResult = (Server, IORef (Result ConfigData), JQStatus)
593
594 -- | Activate the master IP address.
595 activateMasterIP :: IO (Result ())
596 activateMasterIP = runResultT $ do
597 liftIO $ logDebug "Activating master IP address"
598 conf_file <- liftIO Path.clusterConfFile
599 config <- mkResultT $ Config.loadConfig conf_file
600 let mnp = Config.getMasterNetworkParameters config
601 masters = Config.getMasterNodes config
602 ems = clusterUseExternalMipScript $ configCluster config
603 liftIO . logDebug $ "Master IP params: " ++ show mnp
604 res <- liftIO . executeRpcCall masters $ RpcCallNodeActivateMasterIp mnp ems
605 _ <- liftIO $ logRpcErrors res
606 liftIO $ logDebug "finished activating master IP address"
607 return ()
608
609 -- | Check function for luxid.
610 checkMain :: CheckFn ()
611 checkMain = handleMasterVerificationOptions
612
613 -- | Prepare function for luxid.
614 prepMain :: PrepFn () PrepResult
615 prepMain _ _ = do
616 Exec.isForkSupported
617 >>= flip exitUnless "The daemon must be compiled without -threaded"
618
619 socket_path <- Path.defaultQuerySocket
620 cleanupSocket socket_path
621 s <- describeError "binding to the Luxi socket"
622 Nothing (Just socket_path) $ getLuxiServer True socket_path
623 cref <- newIORef (Bad "Configuration not yet loaded")
624 jq <- emptyJQStatus cref
625 return (s, cref, jq)
626
627 -- | Main function.
628 main :: MainFn () PrepResult
629 main _ _ (server, cref, jq) = do
630 -- Subscribe to config udpates. If the config changes, write new config and
631 -- check if the changes should trigger the scheduler.
632 initConfigReader $ \newConfig -> do
633
634 runScheduler <- atomicModifyIORef cref $ \oldConfig ->
635 case (oldConfig, newConfig) of
636 (Ok old, Ok new) -> (newConfig, configChangeNeedsRescheduling old new)
637 _ -> (newConfig, True) -- no old or new config, schedule
638
639 when runScheduler (scheduleSomeJobs jq)
640
641 let creader = readIORef cref
642
643 qlockFile <- jobQueueLockFile
644 _ <- lockFile qlockFile >>= exitIfBad "Failed to obtain the job-queue lock"
645 qlock <- newLock
646
647 _ <- P.installHandler P.sigCHLD P.Ignore Nothing
648
649 _ <- forkIO . void $ activateMasterIP
650
651 initJQScheduler jq
652
653 finally
654 (forever $ U.listener (luxiHandler (qlock, jq, creader)) server)
655 (closeServer server >> removeFile qlockFile)