aefe129c506f04d2a3d72a6c401bc628b337fe2f
[ganeti-github.git] / src / Ganeti / Query / Server.hs
1 {-# LANGUAGE FlexibleContexts #-}
2
3 {-| Implementation of the Ganeti Query2 server.
4
5 -}
6
7 {-
8
9 Copyright (C) 2012, 2013, 2014 Google Inc.
10 All rights reserved.
11
12 Redistribution and use in source and binary forms, with or without
13 modification, are permitted provided that the following conditions are
14 met:
15
16 1. Redistributions of source code must retain the above copyright notice,
17 this list of conditions and the following disclaimer.
18
19 2. Redistributions in binary form must reproduce the above copyright
20 notice, this list of conditions and the following disclaimer in the
21 documentation and/or other materials provided with the distribution.
22
23 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
24 IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
25 TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
26 PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
27 CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
28 EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
29 PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
30 PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
31 LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
32 NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
33 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
34
35 -}
36
37 module Ganeti.Query.Server
38 ( main
39 , checkMain
40 , prepMain
41 ) where
42
43 import Prelude ()
44 import Ganeti.Prelude
45
46 import Control.Concurrent
47 import Control.Exception
48 import Control.Lens ((.~))
49 import Control.Monad (forever, when, mzero, guard, zipWithM, liftM, void)
50 import Control.Monad.Base (MonadBase, liftBase)
51 import Control.Monad.Error.Class (MonadError)
52 import Control.Monad.IO.Class
53 import Control.Monad.Trans (lift)
54 import Control.Monad.Trans.Maybe
55 import qualified Data.ByteString.UTF8 as UTF8
56 import qualified Data.Set as Set (toList)
57 import Data.IORef
58 import Data.List (intersperse)
59 import Data.Maybe (fromMaybe)
60 import qualified Text.JSON as J
61 import Text.JSON (encode, showJSON, JSValue(..))
62 import System.Info (arch)
63 import System.Directory
64 import System.Posix.Process (getProcessID)
65 import System.Posix.Signals as P
66
67 import qualified Ganeti.Constants as C
68 import qualified Ganeti.ConstantUtils as ConstantUtils (unFrozenSet)
69 import Ganeti.Errors
70 import qualified Ganeti.Path as Path
71 import Ganeti.Daemon
72 import Ganeti.Daemon.Utils (handleMasterVerificationOptions)
73 import Ganeti.Objects
74 import Ganeti.Objects.Lens (configFiltersL)
75 import qualified Ganeti.Config as Config
76 import qualified Ganeti.Compat as Compat
77 import Ganeti.ConfigReader
78 import Ganeti.BasicTypes
79 import Ganeti.JQueue
80 import Ganeti.JQScheduler
81 import Ganeti.JSON (TimeAsDoubleJSON(..), alterContainerL, lookupContainer)
82 import Ganeti.Locking.Locks (ClientId(..), ClientType(ClientOther))
83 import Ganeti.Logging
84 import Ganeti.Luxi
85 import qualified Ganeti.Query.Language as Qlang
86 import qualified Ganeti.Query.Cluster as QCluster
87 import Ganeti.Path ( queueDir, jobQueueLockFile, jobQueueDrainFile )
88 import Ganeti.Rpc
89 import qualified Ganeti.Query.Exec as Exec
90 import Ganeti.Query.Query
91 import Ganeti.Query.Filter (makeSimpleFilter)
92 import Ganeti.THH.HsRPC (runRpcClient, RpcClientMonad)
93 import Ganeti.Types
94 import qualified Ganeti.UDSServer as U (Handler(..), listener)
95 import Ganeti.Utils ( lockFile, exitIfBad, exitUnless, watchFile
96 , safeRenameFile, newUUID, isUUID )
97 import Ganeti.Utils.Monad (orM)
98 import Ganeti.Utils.MVarLock
99 import qualified Ganeti.Version as Version
100 import Ganeti.WConfd.Client ( getWConfdClient, withLockedConfig, writeConfig
101 , cleanupLocks)
102
103
104 -- | Creates a `ClientId` that identifies the current luxi
105 -- (process, thread).
106 --
107 -- This means that this `ClientId` will be different for each request
108 -- handled by luxid.
109 makeLuxidClientId :: JQStatus -> IO ClientId
110 makeLuxidClientId status = do
111 pid <- getProcessID
112 tid <- myThreadId
113 return ClientId
114 { ciIdentifier = ClientOther $ "luxid-" ++ show tid
115 , ciLockFile = jqLivelock status
116 , ciPid = pid
117 }
118
119 -- | Creates a connection to WConfd and locks the config, allowing
120 -- to run some WConfd RPC commands given the locked config.
121 --
122 -- This is needed when luxid wants to change the config.
123 --
124 -- Example:
125 --
126 -- > cid <- makeLuxidClientId ...
127 -- > withLockedWconfdConfig cid $ \lockedCfg -> do
128 -- > -- some (IO) action that needs to be run inside having the lock
129 -- > writeConfig cid (updateConfig lockedCfg)
130 withLockedWconfdConfig
131 :: (MonadBase IO m, MonadError GanetiException m)
132 => ClientId
133 -> (ConfigData -> RpcClientMonad a)
134 -> m a
135 withLockedWconfdConfig cid f = do
136 wconfdClient <- liftBase $ getWConfdClient =<< Path.defaultWConfdSocket
137 runRpcClient (withLockedConfig cid False f) wconfdClient
138
139 -- | Helper for classic queries.
140 handleQuery :: [Qlang.ItemType -> Qlang.FilterField] -- ^ Fields to put into
141 -- the query
142 -> ConfigData -- ^ Cluster config
143 -> Qlang.ItemType -- ^ Query type
144 -> [Either String Integer] -- ^ Requested names
145 -- (empty means all)
146 -> [String] -- ^ Requested fields
147 -> Bool -- ^ Whether to do sync queries or not
148 -> IO (GenericResult GanetiException JSValue)
149 handleQuery _ _ _ _ _ True =
150 return . Bad $ OpPrereqError "Sync queries are not allowed" ECodeInval
151 handleQuery filterFields cfg qkind names fields _ = do
152 let simpleNameFilter field = makeSimpleFilter (field qkind) names
153 flt = Qlang.OrFilter $ map simpleNameFilter filterFields
154 qr <- query cfg True (Qlang.Query qkind fields flt)
155 return $ showJSON <$> (qr >>= queryCompat)
156
157 -- | Helper for classic queries.
158 -- Queries `name` and `uuid` fields.
159 handleClassicQuery :: ConfigData -- ^ Cluster config
160 -> Qlang.ItemType -- ^ Query type
161 -> [Either String Integer] -- ^ Requested names
162 -- (empty means all)
163 -> [String] -- ^ Requested fields
164 -> Bool -- ^ Whether to do sync queries or not
165 -> IO (GenericResult GanetiException JSValue)
166 handleClassicQuery = handleQuery [nameField, uuidField]
167
168 -- | Like `handleClassicQuery`, but filters only by UUID.
169 handleUuidQuery :: ConfigData -- ^ Cluster config
170 -> Qlang.ItemType -- ^ Query type
171 -> [Either String Integer] -- ^ Requested names
172 -- (empty means all)
173 -> [String] -- ^ Requested fields
174 -> Bool -- ^ Whether to do sync queries or not
175 -> IO (GenericResult GanetiException JSValue)
176 handleUuidQuery = handleQuery [uuidField]
177
178 -- | Minimal wrapper to handle the missing config case.
179 handleCallWrapper :: Lock -> JQStatus -> Result ConfigData
180 -> LuxiOp -> IO (ErrorResult JSValue)
181 handleCallWrapper _ _ (Bad msg) _ =
182 return . Bad . ConfigurationError $
183 "I do not have access to a valid configuration, cannot\
184 \ process queries: " ++ msg
185 handleCallWrapper qlock qstat (Ok config) op = handleCall qlock qstat config op
186
187 -- | Actual luxi operation handler.
188 handleCall :: Lock -> JQStatus
189 -> ConfigData -> LuxiOp -> IO (ErrorResult JSValue)
190 handleCall _ _ cdata QueryClusterInfo =
191 let cluster = configCluster cdata
192 master = QCluster.clusterMasterNodeName cdata
193 hypervisors = clusterEnabledHypervisors cluster
194 diskTemplates = clusterEnabledDiskTemplates cluster
195 def_hv = case hypervisors of
196 x:_ -> showJSON x
197 [] -> JSNull
198 bits = show (Compat.finiteBitSize (0::Int)) ++ "bits"
199 arch_tuple = [bits, arch]
200 obj = [ ("software_version", showJSON C.releaseVersion)
201 , ("protocol_version", showJSON C.protocolVersion)
202 , ("config_version", showJSON C.configVersion)
203 , ("os_api_version", showJSON . maximum .
204 Set.toList . ConstantUtils.unFrozenSet $
205 C.osApiVersions)
206 , ("export_version", showJSON C.exportVersion)
207 , ("vcs_version", showJSON Version.version)
208 , ("architecture", showJSON arch_tuple)
209 , ("name", showJSON $ clusterClusterName cluster)
210 , ("master", showJSON (case master of
211 Ok name -> name
212 _ -> undefined))
213 , ("default_hypervisor", def_hv)
214 , ("enabled_hypervisors", showJSON hypervisors)
215 , ("hvparams", showJSON $ clusterHvparams cluster)
216 , ("os_hvp", showJSON $ clusterOsHvp cluster)
217 , ("beparams", showJSON $ clusterBeparams cluster)
218 , ("osparams", showJSON $ clusterOsparams cluster)
219 , ("ipolicy", showJSON $ clusterIpolicy cluster)
220 , ("nicparams", showJSON $ clusterNicparams cluster)
221 , ("ndparams", showJSON $ clusterNdparams cluster)
222 , ("diskparams", showJSON $ clusterDiskparams cluster)
223 , ("candidate_pool_size",
224 showJSON $ clusterCandidatePoolSize cluster)
225 , ("max_running_jobs",
226 showJSON $ clusterMaxRunningJobs cluster)
227 , ("max_tracked_jobs",
228 showJSON $ clusterMaxTrackedJobs cluster)
229 , ("mac_prefix", showJSON $ clusterMacPrefix cluster)
230 , ("master_netdev", showJSON $ clusterMasterNetdev cluster)
231 , ("master_netmask", showJSON $ clusterMasterNetmask cluster)
232 , ("use_external_mip_script",
233 showJSON $ clusterUseExternalMipScript cluster)
234 , ("volume_group_name",
235 maybe JSNull showJSON (clusterVolumeGroupName cluster))
236 , ("drbd_usermode_helper",
237 maybe JSNull showJSON (clusterDrbdUsermodeHelper cluster))
238 , ("file_storage_dir", showJSON $ clusterFileStorageDir cluster)
239 , ("shared_file_storage_dir",
240 showJSON $ clusterSharedFileStorageDir cluster)
241 , ("gluster_storage_dir",
242 showJSON $ clusterGlusterStorageDir cluster)
243 , ("maintain_node_health",
244 showJSON $ clusterMaintainNodeHealth cluster)
245 , ("ctime", showJSON . TimeAsDoubleJSON $ clusterCtime cluster)
246 , ("mtime", showJSON . TimeAsDoubleJSON $ clusterMtime cluster)
247 , ("uuid", showJSON $ clusterUuid cluster)
248 , ("tags", showJSON $ clusterTags cluster)
249 , ("uid_pool", showJSON $ clusterUidPool cluster)
250 , ("default_iallocator",
251 showJSON $ clusterDefaultIallocator cluster)
252 , ("default_iallocator_params",
253 showJSON $ clusterDefaultIallocatorParams cluster)
254 , ("reserved_lvs", showJSON $ clusterReservedLvs cluster)
255 , ("primary_ip_version",
256 showJSON . ipFamilyToVersion $ clusterPrimaryIpFamily cluster)
257 , ("prealloc_wipe_disks",
258 showJSON $ clusterPreallocWipeDisks cluster)
259 , ("hidden_os", showJSON $ clusterHiddenOs cluster)
260 , ("blacklisted_os", showJSON $ clusterBlacklistedOs cluster)
261 , ("enabled_disk_templates", showJSON diskTemplates)
262 , ("install_image", showJSON $ clusterInstallImage cluster)
263 , ("instance_communication_network",
264 showJSON (clusterInstanceCommunicationNetwork cluster))
265 , ("zeroing_image", showJSON $ clusterZeroingImage cluster)
266 , ("compression_tools",
267 showJSON $ clusterCompressionTools cluster)
268 , ("enabled_user_shutdown",
269 showJSON $ clusterEnabledUserShutdown cluster)
270 , ("enabled_data_collectors",
271 showJSON . fmap dataCollectorActive
272 $ clusterDataCollectors cluster)
273 , ("data_collector_interval",
274 showJSON . fmap dataCollectorInterval
275 $ clusterDataCollectors cluster)
276 , ("diagnose_data_collector_filename",
277 showJSON $ clusterDiagnoseDataCollectorFilename cluster)
278 , ("maint_round_delay",
279 showJSON . maintRoundDelay $ configMaintenance cdata)
280 , ("maint_balance",
281 showJSON . maintBalance $ configMaintenance cdata)
282 , ("maint_balance_threshold",
283 showJSON . maintBalanceThreshold $ configMaintenance cdata)
284 , ("hv_state",
285 showJSON $ clusterHvStateStatic cluster)
286 , ("disk_state",
287 showJSON $ clusterDiskStateStatic cluster)
288 , ("modify_ssh_setup",
289 showJSON $ clusterModifySshSetup cluster)
290 , ("ssh_key_type", showJSON $ clusterSshKeyType cluster)
291 , ("ssh_key_bits", showJSON $ clusterSshKeyBits cluster)
292 ]
293
294 in case master of
295 Ok _ -> return . Ok . J.makeObj $ obj
296 Bad ex -> return $ Bad ex
297
298 handleCall _ _ cfg (QueryTags kind name) = do
299 let tags = case kind of
300 TagKindCluster -> Ok . clusterTags $ configCluster cfg
301 TagKindGroup -> groupTags <$> Config.getGroup cfg name
302 TagKindNode -> nodeTags <$> Config.getNode cfg name
303 TagKindInstance -> instTags <$> Config.getInstance cfg name
304 TagKindNetwork -> networkTags <$> Config.getNetwork cfg name
305 return (J.showJSON <$> tags)
306
307 handleCall _ _ cfg (Query qkind qfields qfilter) = do
308 result <- query cfg True (Qlang.Query qkind qfields qfilter)
309 return $ J.showJSON <$> result
310
311 handleCall _ _ _ (QueryFields qkind qfields) = do
312 let result = queryFields (Qlang.QueryFields qkind qfields)
313 return $ J.showJSON <$> result
314
315 handleCall _ _ cfg (QueryNodes names fields lock) =
316 handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRNode)
317 (map Left names) fields lock
318
319 handleCall _ _ cfg (QueryInstances names fields lock) =
320 handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRInstance)
321 (map Left names) fields lock
322
323 handleCall _ _ cfg (QueryGroups names fields lock) =
324 handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRGroup)
325 (map Left names) fields lock
326
327 handleCall _ _ cfg (QueryJobs names fields) =
328 handleClassicQuery cfg (Qlang.ItemTypeLuxi Qlang.QRJob)
329 (map (Right . fromIntegral . fromJobId) names) fields False
330
331 handleCall _ _ cfg (QueryFilters uuids fields) =
332 handleUuidQuery cfg (Qlang.ItemTypeLuxi Qlang.QRFilter)
333 (map Left uuids) fields False
334
335 handleCall _ status _ (ReplaceFilter mUuid priority predicates action
336 reason) =
337 -- Handles both adding new filter and changing existing ones.
338 runResultT $ do
339
340 -- Check that uuid `String` is actually a UUID.
341 uuid <- case mUuid of
342 Nothing -> liftIO newUUID -- Request to add a new filter
343 Just u -- Request to edit an existing filter
344 | isUUID u -> return u
345 | otherwise -> fail "Unable to parse UUID"
346
347 timestamp <- liftIO $ reasonTrailTimestamp <$> currentTimestamp
348 let luxidReason = ("luxid", "", timestamp)
349
350 -- Ask WConfd to change the config for us.
351 cid <- liftIO $ makeLuxidClientId status
352 withLockedWconfdConfig cid $ \lockedCfg -> do
353 -- Reading the latest JobID inside the Wconfd lock to really get the
354 -- most recent one (locking may block us for some time).
355 serial <- liftIO readSerialFromDisk
356 case serial of
357 Bad err -> fail $ "AddFilter: reading current JobId failed: " ++ err
358 Ok watermark -> do
359 let rule = FilterRule { frWatermark = watermark
360 , frPriority = priority
361 , frPredicates = predicates
362 , frAction = action
363 , frReasonTrail = reason ++ [luxidReason]
364 , frUuid = UTF8.fromString uuid
365 }
366 writeConfig cid
367 . (configFiltersL . alterContainerL (UTF8.fromString uuid)
368 .~ Just rule)
369 $ lockedCfg
370
371 -- Return UUID of added/replaced filter.
372 return $ showJSON uuid
373
374 handleCall _ status cfg (DeleteFilter uuid) = runResultT $ do
375 -- Check if filter exists.
376 _ <- lookupContainer
377 (failError $ "Filter rule with UUID " ++ uuid ++ " does not exist")
378 (UTF8.fromString uuid)
379 (configFilters cfg)
380
381 -- Ask WConfd to change the config for us.
382 cid <- liftIO $ makeLuxidClientId status
383 withLockedWconfdConfig cid $ \lockedCfg ->
384 writeConfig cid
385 . (configFiltersL . alterContainerL (UTF8.fromString uuid) .~ Nothing)
386 $ lockedCfg
387
388 return JSNull
389
390 handleCall _ _ cfg (QueryNetworks names fields lock) =
391 handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRNetwork)
392 (map Left names) fields lock
393
394 handleCall _ _ cfg (QueryConfigValues fields) = do
395 let clusterProperty fn = showJSON . fn . configCluster $ cfg
396 let params = [ ("cluster_name", return $ clusterProperty clusterClusterName)
397 , ("watcher_pause", liftM (maybe JSNull showJSON)
398 QCluster.isWatcherPaused)
399 , ("master_node", return . genericResult (const JSNull) showJSON
400 $ QCluster.clusterMasterNodeName cfg)
401 , ("drain_flag", liftM (showJSON . not) isQueueOpen)
402 , ("modify_ssh_setup",
403 return $ clusterProperty clusterModifySshSetup)
404 , ("ssh_key_type", return $ clusterProperty clusterSshKeyType)
405 , ("ssh_key_bits", return $ clusterProperty clusterSshKeyBits)
406 ] :: [(String, IO JSValue)]
407 let answer = map (fromMaybe (return JSNull) . flip lookup params) fields
408 answerEval <- sequence answer
409 return . Ok . showJSON $ answerEval
410
411 handleCall _ _ cfg (QueryExports nodes lock) =
412 handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRExport)
413 (map Left nodes) ["node", "export"] lock
414
415 handleCall qlock qstat cfg (SubmitJobToDrainedQueue ops) = runResultT $ do
416 jid <- mkResultT $ allocateJobId (Config.getMasterCandidates cfg) qlock
417 ts <- liftIO currentTimestamp
418 job <- liftM (extendJobReasonTrail . setReceivedTimestamp ts)
419 $ queuedJobFromOpCodes jid ops
420 qDir <- liftIO queueDir
421 _ <- writeAndReplicateJob cfg qDir job
422 _ <- liftIO . forkIO $ enqueueNewJobs qstat [job]
423 return . showJSON . fromJobId $ jid
424
425 handleCall qlock qstat cfg (SubmitJob ops) =
426 do
427 open <- isQueueOpen
428 if not open
429 then return . Bad . GenericError $ "Queue drained"
430 else handleCall qlock qstat cfg (SubmitJobToDrainedQueue ops)
431
432 handleCall qlock qstat cfg (SubmitManyJobs lops) =
433 do
434 open <- isQueueOpen
435 if not open
436 then return . Bad . GenericError $ "Queue drained"
437 else do
438 let mcs = Config.getMasterCandidates cfg
439 result_jobids <- allocateJobIds mcs qlock (length lops)
440 case result_jobids of
441 Bad s -> return . Bad . GenericError $ s
442 Ok jids -> do
443 ts <- currentTimestamp
444 jobs <- liftM (map $ extendJobReasonTrail . setReceivedTimestamp ts)
445 $ zipWithM queuedJobFromOpCodes jids lops
446 qDir <- queueDir
447 write_results <- mapM (writeJobToDisk qDir) jobs
448 let annotated_results = zip write_results jobs
449 succeeded = map snd $ filter (isOk . fst) annotated_results
450 when (any isBad write_results) . logWarning
451 $ "Writing some jobs failed " ++ show annotated_results
452 replicateManyJobs qDir mcs succeeded
453 _ <- forkIO $ enqueueNewJobs qstat succeeded
454 return . Ok . JSArray
455 . map (\(res, job) ->
456 if isOk res
457 then showJSON (True, fromJobId $ qjId job)
458 else showJSON (False, genericResult id (const "") res))
459 $ annotated_results
460
461 handleCall _ _ cfg (WaitForJobChange jid fields prev_job prev_log tmout) =
462 waitForJobChange jid prev_job tmout $ computeJobUpdate cfg jid fields prev_log
463
464 handleCall _ _ cfg (SetWatcherPause time) = do
465 let mcs = Config.getMasterOrCandidates cfg
466 _ <- executeRpcCall mcs $ RpcCallSetWatcherPause time
467 return . Ok . maybe JSNull showJSON $ fmap TimeAsDoubleJSON time
468
469 handleCall _ _ cfg (SetDrainFlag value) = do
470 let mcs = Config.getMasterCandidates cfg
471 fpath <- jobQueueDrainFile
472 if value
473 then writeFile fpath ""
474 else removeFile fpath
475 _ <- executeRpcCall mcs $ RpcCallSetDrainFlag value
476 return . Ok . showJSON $ True
477
478 handleCall _ qstat cfg (ChangeJobPriority jid prio) = do
479 let jName = (++) "job " . show $ fromJobId jid
480 maybeJob <- setJobPriority qstat jid prio
481 case maybeJob of
482 Bad s -> return . Ok $ showJSON (False, s)
483 Ok (Just job) -> runResultT $ do
484 let mcs = Config.getMasterCandidates cfg
485 qDir <- liftIO queueDir
486 liftIO $ replicateManyJobs qDir mcs [job]
487 return $ showJSON (True, "Priorities of pending opcodes for "
488 ++ jName ++ " have been changed"
489 ++ " to " ++ show prio)
490 Ok Nothing -> do
491 logDebug $ jName ++ " started, will signal"
492 fmap showJSON <$> tellJobPriority (jqLivelock qstat) jid prio
493
494 handleCall _ qstat cfg (CancelJob jid kill) = do
495 let jName = (++) "job " . show $ fromJobId jid
496 dequeueResult <- dequeueJob qstat jid
497 case dequeueResult of
498 Ok True ->
499 let jobFileFailed = (,) False
500 . (++) ("Dequeued " ++ jName
501 ++ ", but failed to mark as cancelled: ")
502 jobFileSucceeded _ = (True, "Dequeued " ++ jName)
503 in liftM (Ok . showJSON . genericResult jobFileFailed jobFileSucceeded)
504 . runResultT $ do
505 logDebug $ jName ++ " dequeued, marking as canceled"
506 qDir <- liftIO queueDir
507 (job, _) <- ResultT $ loadJobFromDisk qDir True jid
508 now <- liftIO currentTimestamp
509 let job' = cancelQueuedJob now job
510 writeAndReplicateJob cfg qDir job'
511 Ok False -> do
512 logDebug $ jName ++ " not queued; trying to cancel directly"
513 result <- fmap showJSON <$> cancelJob kill (jqLivelock qstat) jid
514 when kill . void . forkIO $ do
515 _ <- orM
516 . intersperse (threadDelay C.luxidJobDeathDelay >> return False)
517 . replicate C.luxidJobDeathDetectionRetries
518 $ cleanupIfDead qstat jid
519 wconfdsocket <- Path.defaultWConfdSocket
520 wconfdclient <- getWConfdClient wconfdsocket
521 void . runResultT $ runRpcClient cleanupLocks wconfdclient
522 return result
523 Bad s -> return . Ok . showJSON $ (False, s)
524
525 handleCall qlock _ cfg (ArchiveJob jid) =
526 -- By adding a layer of MaybeT, we can prematurely end a computation
527 -- using 'mzero' or other 'MonadPlus' primitive and return 'Ok False'.
528 runResultT . liftM (showJSON . fromMaybe False) . runMaybeT $ do
529 qDir <- liftIO queueDir
530 let mcs = Config.getMasterCandidates cfg
531 live = liveJobFile qDir jid
532 archive = archivedJobFile qDir jid
533 withLock qlock $ do
534 (job, _) <- (lift . mkResultT $ loadJobFromDisk qDir False jid)
535 `orElse` mzero
536 guard $ jobFinalized job
537 lift . withErrorT JobQueueError
538 . annotateError "Archiving failed in an unexpected way"
539 . mkResultT $ safeRenameFile queueDirPermissions live archive
540 _ <- liftIO . executeRpcCall mcs
541 $ RpcCallJobqueueRename [(live, archive)]
542 return True
543
544 handleCall qlock _ cfg (AutoArchiveJobs age timeout) = do
545 qDir <- queueDir
546 resultJids <- getJobIDs [qDir]
547 case resultJids of
548 Bad s -> return . Bad . JobQueueError $ show s
549 Ok jids -> do
550 result <- withLock qlock
551 . archiveJobs cfg age timeout
552 $ sortJobIDs jids
553 return . Ok $ showJSON result
554
555 handleCall _ _ _ (PickupJob _) =
556 return . Bad
557 $ GenericError "Luxi call 'PickupJob' is for internal use only"
558
559 {-# ANN handleCall "HLint: ignore Too strict if" #-}
560
561 -- | Special-case handler for WaitForJobChange RPC call for fields == ["status"]
562 -- that doesn't require the use of ConfigData
563 handleWaitForJobChangeStatus :: JobId -> JSValue -> JSValue -> Int
564 -> IO (ErrorResult JSValue)
565 handleWaitForJobChangeStatus jid prev_job prev_log tmout =
566 waitForJobChange jid prev_job tmout $ computeJobUpdateStatus jid prev_log
567
568 -- | Common WaitForJobChange functionality shared between handleCall and
569 -- handleWaitForJobChangeStatus
570 waitForJobChange :: JobId -> JSValue -> Int -> IO (JSValue, JSValue)
571 -> IO (ErrorResult JSValue)
572 waitForJobChange jid prev_job tmout compute_fn = do
573 qDir <- queueDir
574 -- verify if the job is finalized, and return immediately in this case
575 jobresult <- loadJobFromDisk qDir False jid
576 case jobresult of
577 Bad s -> return . Bad $ JobLost s
578 Ok (job, _) | not (jobFinalized job) -> do
579 let jobfile = liveJobFile qDir jid
580 answer <- watchFile jobfile (min tmout C.luxiWfjcTimeout)
581 (prev_job, JSArray []) compute_fn
582 return . Ok $ showJSON answer
583 _ -> liftM (Ok . showJSON) compute_fn
584
585 -- | Query the status of a job and return the requested fields
586 -- and the logs newer than the given log number.
587 computeJobUpdate :: ConfigData -> JobId -> [String] -> JSValue
588 -> IO (JSValue, JSValue)
589 computeJobUpdate cfg jid fields prev_log = do
590 let sjid = show $ fromJobId jid
591 logDebug $ "Inspecting fields " ++ show fields ++ " of job " ++ sjid
592 let fromJSArray (JSArray xs) = xs
593 fromJSArray _ = []
594 let logFilter JSNull (JSArray _) = True
595 logFilter (JSRational _ n) (JSArray (JSRational _ m:_)) = n < m
596 logFilter _ _ = False
597 let filterLogs n logs = JSArray (filter (logFilter n) (logs >>= fromJSArray))
598 jobQuery <- handleClassicQuery cfg (Qlang.ItemTypeLuxi Qlang.QRJob)
599 [Right . fromIntegral $ fromJobId jid] ("oplog" : fields) False
600 let (rfields, rlogs) = case jobQuery of
601 Ok (JSArray [JSArray (JSArray logs : answer)]) ->
602 (answer, filterLogs prev_log logs)
603 _ -> (map (const JSNull) fields, JSArray [])
604 logDebug $ "Updates for job " ++ sjid ++ " are " ++ encode (rfields, rlogs)
605 return (JSArray rfields, rlogs)
606
607 -- | A version of computeJobUpdate hardcoded to only return logs and the status
608 -- field. By hardcoding this we avoid using the luxi Query infrastructure and
609 -- the ConfigData value it requires.
610 computeJobUpdateStatus :: JobId -> JSValue -> IO (JSValue, JSValue)
611 computeJobUpdateStatus jid prev_log = do
612 qdir <- queueDir
613 loadResult <- loadJobFromDisk qdir True jid
614 let sjid = show $ fromJobId jid
615 logDebug $ "Inspecting status of job " ++ sjid
616 let (rfields, rlogs) = case loadResult of
617 Ok (job, _) -> (J.JSArray [status], newlogs)
618 where status = showJSON $ calcJobStatus job -- like "status" jobField
619 oplogs = map qoLog (qjOps job) -- like "oplog" jobField
620 newer = case J.readJSON prev_log of
621 J.Ok n -> (\(idx, _time, _type, _msg) -> n < idx)
622 _ -> const True
623 newlogs = showJSON $ concatMap (filter newer) oplogs
624 _ -> (JSArray[JSNull], JSArray [])
625 logDebug $ "Updates for job " ++ sjid ++ " are " ++ encode (rfields, rlogs)
626 return (rfields, rlogs)
627
628 type LuxiConfig = (Lock, JQStatus, ConfigReader)
629
630 luxiExec
631 :: LuxiConfig
632 -> LuxiOp
633 -> IO (Bool, GenericResult GanetiException JSValue)
634 luxiExec (qlock, qstat, creader) args =
635 case args of
636 -- Special case WaitForJobChange handling to avoid passing a ConfigData to
637 -- a potentially long-lived thread. ConfigData uses lots of heap, and
638 -- multiple handler threads retaining different versions of ConfigData
639 -- increases luxi's memory use for concurrent jobs that modify config.
640 WaitForJobChange jid fields prev_job prev_log tmout
641 | fields == ["status"] -> do
642 result <- handleWaitForJobChangeStatus jid prev_job prev_log tmout
643 return (True, result)
644 _ -> do
645 cfg <- creader
646 result <- handleCallWrapper qlock qstat cfg args
647 return (True, result)
648
649 luxiHandler :: LuxiConfig -> U.Handler LuxiOp IO JSValue
650 luxiHandler cfg = U.Handler { U.hParse = decodeLuxiCall
651 , U.hInputLogShort = strOfOp
652 , U.hInputLogLong = show
653 , U.hExec = luxiExec cfg
654 }
655
656 -- | Type alias for prepMain results
657 type PrepResult = (Server, IORef (Result ConfigData), JQStatus)
658
659 -- | Activate the master IP address.
660 activateMasterIP :: IO (Result ())
661 activateMasterIP = runResultT $ do
662 liftIO $ logDebug "Activating master IP address"
663 conf_file <- liftIO Path.clusterConfFile
664 config <- mkResultT $ Config.loadConfig conf_file
665 let mnp = Config.getMasterNetworkParameters config
666 masters = Config.getMasterNodes config
667 ems = clusterUseExternalMipScript $ configCluster config
668 liftIO . logDebug $ "Master IP params: " ++ show mnp
669 res <- liftIO . executeRpcCall masters $ RpcCallNodeActivateMasterIp mnp ems
670 _ <- liftIO $ logRpcErrors res
671 liftIO $ logDebug "finished activating master IP address"
672 return ()
673
674 -- | Check function for luxid.
675 checkMain :: CheckFn ()
676 checkMain = handleMasterVerificationOptions
677
678 -- | Prepare function for luxid.
679 prepMain :: PrepFn () PrepResult
680 prepMain _ _ = do
681 Exec.isForkSupported
682 >>= flip exitUnless "The daemon must be compiled without -threaded"
683
684 socket_path <- Path.defaultQuerySocket
685 cleanupSocket socket_path
686 s <- describeError "binding to the Luxi socket"
687 Nothing (Just socket_path) $ getLuxiServer True socket_path
688 cref <- newIORef (Bad "Configuration not yet loaded")
689 jq <- emptyJQStatus cref
690 return (s, cref, jq)
691
692 -- | Main function.
693 main :: MainFn () PrepResult
694 main _ _ (server, cref, jq) = do
695 -- Subscribe to config udpates. If the config changes, write new config and
696 -- check if the changes should trigger the scheduler.
697 initConfigReader $ \newConfig -> do
698
699 runScheduler <- atomicModifyIORef cref $ \oldConfig ->
700 case (oldConfig, newConfig) of
701 (Ok old, Ok new) -> (newConfig, configChangeNeedsRescheduling old new)
702 _ -> (newConfig, True) -- no old or new config, schedule
703
704 when runScheduler (updateStatusAndScheduleSomeJobs jq)
705
706 let creader = readIORef cref
707
708 qlockFile <- jobQueueLockFile
709 _ <- lockFile qlockFile >>= exitIfBad "Failed to obtain the job-queue lock"
710 qlock <- newLock
711
712 _ <- P.installHandler P.sigCHLD P.Ignore Nothing
713
714 _ <- forkIO . void $ activateMasterIP
715
716 initJQScheduler jq
717
718 finally
719 (forever $ U.listener (luxiHandler (qlock, jq, creader)) server)
720 (closeServer server >> removeFile qlockFile)