Implement predictive queue cluster parameter
[ganeti-github.git] / src / Ganeti / Query / Server.hs
1 {-# LANGUAGE FlexibleContexts #-}
2
3 {-| Implementation of the Ganeti Query2 server.
4
5 -}
6
7 {-
8
9 Copyright (C) 2012, 2013, 2014 Google Inc.
10 All rights reserved.
11
12 Redistribution and use in source and binary forms, with or without
13 modification, are permitted provided that the following conditions are
14 met:
15
16 1. Redistributions of source code must retain the above copyright notice,
17 this list of conditions and the following disclaimer.
18
19 2. Redistributions in binary form must reproduce the above copyright
20 notice, this list of conditions and the following disclaimer in the
21 documentation and/or other materials provided with the distribution.
22
23 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
24 IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
25 TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
26 PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
27 CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
28 EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
29 PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
30 PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
31 LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
32 NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
33 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
34
35 -}
36
37 module Ganeti.Query.Server
38 ( main
39 , checkMain
40 , prepMain
41 ) where
42
43 import Prelude ()
44 import Ganeti.Prelude
45
46 import Control.Concurrent
47 import Control.Exception
48 import Control.Lens ((.~))
49 import Control.Monad (forever, when, mzero, guard, zipWithM, liftM, void)
50 import Control.Monad.Base (MonadBase, liftBase)
51 import Control.Monad.Error.Class (MonadError)
52 import Control.Monad.IO.Class
53 import Control.Monad.Trans (lift)
54 import Control.Monad.Trans.Maybe
55 import qualified Data.ByteString.UTF8 as UTF8
56 import qualified Data.Set as Set (toList)
57 import Data.IORef
58 import Data.List (intersperse)
59 import Data.Maybe (fromMaybe)
60 import qualified Text.JSON as J
61 import Text.JSON (encode, showJSON, JSValue(..))
62 import System.Info (arch)
63 import System.Directory
64 import System.Posix.Process (getProcessID)
65 import System.Posix.Signals as P
66
67 import qualified Ganeti.Constants as C
68 import qualified Ganeti.ConstantUtils as ConstantUtils (unFrozenSet)
69 import Ganeti.Errors
70 import qualified Ganeti.Path as Path
71 import Ganeti.Daemon
72 import Ganeti.Daemon.Utils (handleMasterVerificationOptions)
73 import Ganeti.Objects
74 import Ganeti.Objects.Lens (configFiltersL)
75 import qualified Ganeti.Config as Config
76 import qualified Ganeti.Compat as Compat
77 import Ganeti.ConfigReader
78 import Ganeti.BasicTypes
79 import Ganeti.JQueue
80 import Ganeti.JQScheduler
81 import Ganeti.JSON (TimeAsDoubleJSON(..), alterContainerL, lookupContainer)
82 import Ganeti.Locking.Locks (ClientId(..), ClientType(ClientOther))
83 import Ganeti.Logging
84 import Ganeti.Luxi
85 import qualified Ganeti.Query.Language as Qlang
86 import qualified Ganeti.Query.Cluster as QCluster
87 import Ganeti.Path ( queueDir, jobQueueLockFile, jobQueueDrainFile )
88 import Ganeti.Rpc
89 import qualified Ganeti.Query.Exec as Exec
90 import Ganeti.Query.Query
91 import Ganeti.Query.Filter (makeSimpleFilter)
92 import Ganeti.THH.HsRPC (runRpcClient, RpcClientMonad)
93 import Ganeti.Types
94 import qualified Ganeti.UDSServer as U (Handler(..), listener)
95 import Ganeti.Utils ( lockFile, exitIfBad, exitUnless, watchFile
96 , safeRenameFile, newUUID, isUUID )
97 import Ganeti.Utils.Monad (orM)
98 import Ganeti.Utils.MVarLock
99 import qualified Ganeti.Version as Version
100 import Ganeti.WConfd.Client ( getWConfdClient, withLockedConfig, writeConfig
101 , cleanupLocks)
102
103
104 -- | Creates a `ClientId` that identifies the current luxi
105 -- (process, thread).
106 --
107 -- This means that this `ClientId` will be different for each request
108 -- handled by luxid.
109 makeLuxidClientId :: JQStatus -> IO ClientId
110 makeLuxidClientId status = do
111 pid <- getProcessID
112 tid <- myThreadId
113 return ClientId
114 { ciIdentifier = ClientOther $ "luxid-" ++ show tid
115 , ciLockFile = jqLivelock status
116 , ciPid = pid
117 }
118
119 -- | Creates a connection to WConfd and locks the config, allowing
120 -- to run some WConfd RPC commands given the locked config.
121 --
122 -- This is needed when luxid wants to change the config.
123 --
124 -- Example:
125 --
126 -- > cid <- makeLuxidClientId ...
127 -- > withLockedWconfdConfig cid $ \lockedCfg -> do
128 -- > -- some (IO) action that needs to be run inside having the lock
129 -- > writeConfig cid (updateConfig lockedCfg)
130 withLockedWconfdConfig
131 :: (MonadBase IO m, MonadError GanetiException m)
132 => ClientId
133 -> (ConfigData -> RpcClientMonad a)
134 -> m a
135 withLockedWconfdConfig cid f = do
136 wconfdClient <- liftBase $ getWConfdClient =<< Path.defaultWConfdSocket
137 runRpcClient (withLockedConfig cid False f) wconfdClient
138
139 -- | Helper for classic queries.
140 handleQuery :: [Qlang.ItemType -> Qlang.FilterField] -- ^ Fields to put into
141 -- the query
142 -> ConfigData -- ^ Cluster config
143 -> Qlang.ItemType -- ^ Query type
144 -> [Either String Integer] -- ^ Requested names
145 -- (empty means all)
146 -> [String] -- ^ Requested fields
147 -> Bool -- ^ Whether to do sync queries or not
148 -> IO (GenericResult GanetiException JSValue)
149 handleQuery _ _ _ _ _ True =
150 return . Bad $ OpPrereqError "Sync queries are not allowed" ECodeInval
151 handleQuery filterFields cfg qkind names fields _ = do
152 let simpleNameFilter field = makeSimpleFilter (field qkind) names
153 flt = Qlang.OrFilter $ map simpleNameFilter filterFields
154 qr <- query cfg True (Qlang.Query qkind fields flt)
155 return $ showJSON <$> (qr >>= queryCompat)
156
157 -- | Helper for classic queries.
158 -- Queries `name` and `uuid` fields.
159 handleClassicQuery :: ConfigData -- ^ Cluster config
160 -> Qlang.ItemType -- ^ Query type
161 -> [Either String Integer] -- ^ Requested names
162 -- (empty means all)
163 -> [String] -- ^ Requested fields
164 -> Bool -- ^ Whether to do sync queries or not
165 -> IO (GenericResult GanetiException JSValue)
166 handleClassicQuery = handleQuery [nameField, uuidField]
167
168 -- | Like `handleClassicQuery`, but filters only by UUID.
169 handleUuidQuery :: ConfigData -- ^ Cluster config
170 -> Qlang.ItemType -- ^ Query type
171 -> [Either String Integer] -- ^ Requested names
172 -- (empty means all)
173 -> [String] -- ^ Requested fields
174 -> Bool -- ^ Whether to do sync queries or not
175 -> IO (GenericResult GanetiException JSValue)
176 handleUuidQuery = handleQuery [uuidField]
177
178 -- | Minimal wrapper to handle the missing config case.
179 handleCallWrapper :: Lock -> JQStatus -> Result ConfigData
180 -> LuxiOp -> IO (ErrorResult JSValue)
181 handleCallWrapper _ _ (Bad msg) _ =
182 return . Bad . ConfigurationError $
183 "I do not have access to a valid configuration, cannot\
184 \ process queries: " ++ msg
185 handleCallWrapper qlock qstat (Ok config) op = handleCall qlock qstat config op
186
187 -- | Actual luxi operation handler.
188 handleCall :: Lock -> JQStatus
189 -> ConfigData -> LuxiOp -> IO (ErrorResult JSValue)
190 handleCall _ _ cdata QueryClusterInfo =
191 let cluster = configCluster cdata
192 master = QCluster.clusterMasterNodeName cdata
193 hypervisors = clusterEnabledHypervisors cluster
194 diskTemplates = clusterEnabledDiskTemplates cluster
195 def_hv = case hypervisors of
196 x:_ -> showJSON x
197 [] -> JSNull
198 bits = show (Compat.finiteBitSize (0::Int)) ++ "bits"
199 arch_tuple = [bits, arch]
200 obj = [ ("software_version", showJSON C.releaseVersion)
201 , ("protocol_version", showJSON C.protocolVersion)
202 , ("config_version", showJSON C.configVersion)
203 , ("os_api_version", showJSON . maximum .
204 Set.toList . ConstantUtils.unFrozenSet $
205 C.osApiVersions)
206 , ("export_version", showJSON C.exportVersion)
207 , ("vcs_version", showJSON Version.version)
208 , ("architecture", showJSON arch_tuple)
209 , ("name", showJSON $ clusterClusterName cluster)
210 , ("master", showJSON (case master of
211 Ok name -> name
212 _ -> undefined))
213 , ("default_hypervisor", def_hv)
214 , ("enabled_hypervisors", showJSON hypervisors)
215 , ("hvparams", showJSON $ clusterHvparams cluster)
216 , ("os_hvp", showJSON $ clusterOsHvp cluster)
217 , ("beparams", showJSON $ clusterBeparams cluster)
218 , ("osparams", showJSON $ clusterOsparams cluster)
219 , ("ipolicy", showJSON $ clusterIpolicy cluster)
220 , ("nicparams", showJSON $ clusterNicparams cluster)
221 , ("ndparams", showJSON $ clusterNdparams cluster)
222 , ("diskparams", showJSON $ clusterDiskparams cluster)
223 , ("candidate_pool_size",
224 showJSON $ clusterCandidatePoolSize cluster)
225 , ("max_running_jobs",
226 showJSON $ clusterMaxRunningJobs cluster)
227 , ("max_tracked_jobs",
228 showJSON $ clusterMaxTrackedJobs cluster)
229 , ("mac_prefix", showJSON $ clusterMacPrefix cluster)
230 , ("master_netdev", showJSON $ clusterMasterNetdev cluster)
231 , ("master_netmask", showJSON $ clusterMasterNetmask cluster)
232 , ("use_external_mip_script",
233 showJSON $ clusterUseExternalMipScript cluster)
234 , ("volume_group_name",
235 maybe JSNull showJSON (clusterVolumeGroupName cluster))
236 , ("drbd_usermode_helper",
237 maybe JSNull showJSON (clusterDrbdUsermodeHelper cluster))
238 , ("file_storage_dir", showJSON $ clusterFileStorageDir cluster)
239 , ("shared_file_storage_dir",
240 showJSON $ clusterSharedFileStorageDir cluster)
241 , ("gluster_storage_dir",
242 showJSON $ clusterGlusterStorageDir cluster)
243 , ("maintain_node_health",
244 showJSON $ clusterMaintainNodeHealth cluster)
245 , ("ctime", showJSON . TimeAsDoubleJSON $ clusterCtime cluster)
246 , ("mtime", showJSON . TimeAsDoubleJSON $ clusterMtime cluster)
247 , ("uuid", showJSON $ clusterUuid cluster)
248 , ("tags", showJSON $ clusterTags cluster)
249 , ("uid_pool", showJSON $ clusterUidPool cluster)
250 , ("default_iallocator",
251 showJSON $ clusterDefaultIallocator cluster)
252 , ("default_iallocator_params",
253 showJSON $ clusterDefaultIallocatorParams cluster)
254 , ("reserved_lvs", showJSON $ clusterReservedLvs cluster)
255 , ("primary_ip_version",
256 showJSON . ipFamilyToVersion $ clusterPrimaryIpFamily cluster)
257 , ("prealloc_wipe_disks",
258 showJSON $ clusterPreallocWipeDisks cluster)
259 , ("hidden_os", showJSON $ clusterHiddenOs cluster)
260 , ("blacklisted_os", showJSON $ clusterBlacklistedOs cluster)
261 , ("enabled_disk_templates", showJSON diskTemplates)
262 , ("install_image", showJSON $ clusterInstallImage cluster)
263 , ("instance_communication_network",
264 showJSON (clusterInstanceCommunicationNetwork cluster))
265 , ("zeroing_image", showJSON $ clusterZeroingImage cluster)
266 , ("compression_tools",
267 showJSON $ clusterCompressionTools cluster)
268 , ("enabled_user_shutdown",
269 showJSON $ clusterEnabledUserShutdown cluster)
270 , ("enabled_data_collectors",
271 showJSON . fmap dataCollectorActive
272 $ clusterDataCollectors cluster)
273 , ("data_collector_interval",
274 showJSON . fmap dataCollectorInterval
275 $ clusterDataCollectors cluster)
276 , ("diagnose_data_collector_filename",
277 showJSON $ clusterDiagnoseDataCollectorFilename cluster)
278 , ("maint_round_delay",
279 showJSON . maintRoundDelay $ configMaintenance cdata)
280 , ("maint_balance",
281 showJSON . maintBalance $ configMaintenance cdata)
282 , ("maint_balance_threshold",
283 showJSON . maintBalanceThreshold $ configMaintenance cdata)
284 , ("hv_state",
285 showJSON $ clusterHvStateStatic cluster)
286 , ("disk_state",
287 showJSON $ clusterDiskStateStatic cluster)
288 , ("modify_ssh_setup",
289 showJSON $ clusterModifySshSetup cluster)
290 , ("ssh_key_type", showJSON $ clusterSshKeyType cluster)
291 , ("ssh_key_bits", showJSON $ clusterSshKeyBits cluster)
292 , ("enabled_predictive_queue",
293 showJSON $ clusterEnabledPredictiveQueue cluster)
294 ]
295
296 in case master of
297 Ok _ -> return . Ok . J.makeObj $ obj
298 Bad ex -> return $ Bad ex
299
300 handleCall _ _ cfg (QueryTags kind name) = do
301 let tags = case kind of
302 TagKindCluster -> Ok . clusterTags $ configCluster cfg
303 TagKindGroup -> groupTags <$> Config.getGroup cfg name
304 TagKindNode -> nodeTags <$> Config.getNode cfg name
305 TagKindInstance -> instTags <$> Config.getInstance cfg name
306 TagKindNetwork -> networkTags <$> Config.getNetwork cfg name
307 return (J.showJSON <$> tags)
308
309 handleCall _ _ cfg (Query qkind qfields qfilter) = do
310 result <- query cfg True (Qlang.Query qkind qfields qfilter)
311 return $ J.showJSON <$> result
312
313 handleCall _ _ _ (QueryFields qkind qfields) = do
314 let result = queryFields (Qlang.QueryFields qkind qfields)
315 return $ J.showJSON <$> result
316
317 handleCall _ _ cfg (QueryNodes names fields lock) =
318 handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRNode)
319 (map Left names) fields lock
320
321 handleCall _ _ cfg (QueryInstances names fields lock) =
322 handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRInstance)
323 (map Left names) fields lock
324
325 handleCall _ _ cfg (QueryGroups names fields lock) =
326 handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRGroup)
327 (map Left names) fields lock
328
329 handleCall _ _ cfg (QueryJobs names fields) =
330 handleClassicQuery cfg (Qlang.ItemTypeLuxi Qlang.QRJob)
331 (map (Right . fromIntegral . fromJobId) names) fields False
332
333 handleCall _ _ cfg (QueryFilters uuids fields) =
334 handleUuidQuery cfg (Qlang.ItemTypeLuxi Qlang.QRFilter)
335 (map Left uuids) fields False
336
337 handleCall _ status _ (ReplaceFilter mUuid priority predicates action
338 reason) =
339 -- Handles both adding new filter and changing existing ones.
340 runResultT $ do
341
342 -- Check that uuid `String` is actually a UUID.
343 uuid <- case mUuid of
344 Nothing -> liftIO newUUID -- Request to add a new filter
345 Just u -- Request to edit an existing filter
346 | isUUID u -> return u
347 | otherwise -> fail "Unable to parse UUID"
348
349 timestamp <- liftIO $ reasonTrailTimestamp <$> currentTimestamp
350 let luxidReason = ("luxid", "", timestamp)
351
352 -- Ask WConfd to change the config for us.
353 cid <- liftIO $ makeLuxidClientId status
354 withLockedWconfdConfig cid $ \lockedCfg -> do
355 -- Reading the latest JobID inside the Wconfd lock to really get the
356 -- most recent one (locking may block us for some time).
357 serial <- liftIO readSerialFromDisk
358 case serial of
359 Bad err -> fail $ "AddFilter: reading current JobId failed: " ++ err
360 Ok watermark -> do
361 let rule = FilterRule { frWatermark = watermark
362 , frPriority = priority
363 , frPredicates = predicates
364 , frAction = action
365 , frReasonTrail = reason ++ [luxidReason]
366 , frUuid = UTF8.fromString uuid
367 }
368 writeConfig cid
369 . (configFiltersL . alterContainerL (UTF8.fromString uuid)
370 .~ Just rule)
371 $ lockedCfg
372
373 -- Return UUID of added/replaced filter.
374 return $ showJSON uuid
375
376 handleCall _ status cfg (DeleteFilter uuid) = runResultT $ do
377 -- Check if filter exists.
378 _ <- lookupContainer
379 (failError $ "Filter rule with UUID " ++ uuid ++ " does not exist")
380 (UTF8.fromString uuid)
381 (configFilters cfg)
382
383 -- Ask WConfd to change the config for us.
384 cid <- liftIO $ makeLuxidClientId status
385 withLockedWconfdConfig cid $ \lockedCfg ->
386 writeConfig cid
387 . (configFiltersL . alterContainerL (UTF8.fromString uuid) .~ Nothing)
388 $ lockedCfg
389
390 return JSNull
391
392 handleCall _ _ cfg (QueryNetworks names fields lock) =
393 handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRNetwork)
394 (map Left names) fields lock
395
396 handleCall _ _ cfg (QueryConfigValues fields) = do
397 let clusterProperty fn = showJSON . fn . configCluster $ cfg
398 let params = [ ("cluster_name", return $ clusterProperty clusterClusterName)
399 , ("watcher_pause", liftM (maybe JSNull showJSON)
400 QCluster.isWatcherPaused)
401 , ("master_node", return . genericResult (const JSNull) showJSON
402 $ QCluster.clusterMasterNodeName cfg)
403 , ("drain_flag", liftM (showJSON . not) isQueueOpen)
404 , ("modify_ssh_setup",
405 return $ clusterProperty clusterModifySshSetup)
406 , ("ssh_key_type", return $ clusterProperty clusterSshKeyType)
407 , ("ssh_key_bits", return $ clusterProperty clusterSshKeyBits)
408 ] :: [(String, IO JSValue)]
409 let answer = map (fromMaybe (return JSNull) . flip lookup params) fields
410 answerEval <- sequence answer
411 return . Ok . showJSON $ answerEval
412
413 handleCall _ _ cfg (QueryExports nodes lock) =
414 handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRExport)
415 (map Left nodes) ["node", "export"] lock
416
417 handleCall qlock qstat cfg (SubmitJobToDrainedQueue ops) = runResultT $ do
418 jid <- mkResultT $ allocateJobId (Config.getMasterCandidates cfg) qlock
419 ts <- liftIO currentTimestamp
420 job <- liftM (extendJobReasonTrail . setReceivedTimestamp ts)
421 $ queuedJobFromOpCodes jid ops
422 qDir <- liftIO queueDir
423 _ <- writeAndReplicateJob cfg qDir job
424 _ <- liftIO . forkIO $ enqueueNewJobs qstat [job]
425 return . showJSON . fromJobId $ jid
426
427 handleCall qlock qstat cfg (SubmitJob ops) =
428 do
429 open <- isQueueOpen
430 if not open
431 then return . Bad . GenericError $ "Queue drained"
432 else handleCall qlock qstat cfg (SubmitJobToDrainedQueue ops)
433
434 handleCall qlock qstat cfg (SubmitManyJobs lops) =
435 do
436 open <- isQueueOpen
437 if not open
438 then return . Bad . GenericError $ "Queue drained"
439 else do
440 let mcs = Config.getMasterCandidates cfg
441 result_jobids <- allocateJobIds mcs qlock (length lops)
442 case result_jobids of
443 Bad s -> return . Bad . GenericError $ s
444 Ok jids -> do
445 ts <- currentTimestamp
446 jobs <- liftM (map $ extendJobReasonTrail . setReceivedTimestamp ts)
447 $ zipWithM queuedJobFromOpCodes jids lops
448 qDir <- queueDir
449 write_results <- mapM (writeJobToDisk qDir) jobs
450 let annotated_results = zip write_results jobs
451 succeeded = map snd $ filter (isOk . fst) annotated_results
452 when (any isBad write_results) . logWarning
453 $ "Writing some jobs failed " ++ show annotated_results
454 replicateManyJobs qDir mcs succeeded
455 _ <- forkIO $ enqueueNewJobs qstat succeeded
456 return . Ok . JSArray
457 . map (\(res, job) ->
458 if isOk res
459 then showJSON (True, fromJobId $ qjId job)
460 else showJSON (False, genericResult id (const "") res))
461 $ annotated_results
462
463 handleCall _ _ cfg (WaitForJobChange jid fields prev_job prev_log tmout) =
464 waitForJobChange jid prev_job tmout $ computeJobUpdate cfg jid fields prev_log
465
466 handleCall _ _ cfg (SetWatcherPause time) = do
467 let mcs = Config.getMasterOrCandidates cfg
468 _ <- executeRpcCall mcs $ RpcCallSetWatcherPause time
469 return . Ok . maybe JSNull showJSON $ fmap TimeAsDoubleJSON time
470
471 handleCall _ _ cfg (SetDrainFlag value) = do
472 let mcs = Config.getMasterCandidates cfg
473 fpath <- jobQueueDrainFile
474 if value
475 then writeFile fpath ""
476 else removeFile fpath
477 _ <- executeRpcCall mcs $ RpcCallSetDrainFlag value
478 return . Ok . showJSON $ True
479
480 handleCall _ qstat cfg (ChangeJobPriority jid prio) = do
481 let jName = (++) "job " . show $ fromJobId jid
482 maybeJob <- setJobPriority qstat jid prio
483 case maybeJob of
484 Bad s -> return . Ok $ showJSON (False, s)
485 Ok (Just job) -> runResultT $ do
486 let mcs = Config.getMasterCandidates cfg
487 qDir <- liftIO queueDir
488 liftIO $ replicateManyJobs qDir mcs [job]
489 return $ showJSON (True, "Priorities of pending opcodes for "
490 ++ jName ++ " have been changed"
491 ++ " to " ++ show prio)
492 Ok Nothing -> do
493 logDebug $ jName ++ " started, will signal"
494 fmap showJSON <$> tellJobPriority (jqLivelock qstat) jid prio
495
496 handleCall _ qstat cfg (CancelJob jid kill) = do
497 let jName = (++) "job " . show $ fromJobId jid
498 dequeueResult <- dequeueJob qstat jid
499 case dequeueResult of
500 Ok True ->
501 let jobFileFailed = (,) False
502 . (++) ("Dequeued " ++ jName
503 ++ ", but failed to mark as cancelled: ")
504 jobFileSucceeded _ = (True, "Dequeued " ++ jName)
505 in liftM (Ok . showJSON . genericResult jobFileFailed jobFileSucceeded)
506 . runResultT $ do
507 logDebug $ jName ++ " dequeued, marking as canceled"
508 qDir <- liftIO queueDir
509 (job, _) <- ResultT $ loadJobFromDisk qDir True jid
510 now <- liftIO currentTimestamp
511 let job' = cancelQueuedJob now job
512 writeAndReplicateJob cfg qDir job'
513 Ok False -> do
514 logDebug $ jName ++ " not queued; trying to cancel directly"
515 result <- fmap showJSON <$> cancelJob kill (jqLivelock qstat) jid
516 when kill . void . forkIO $ do
517 _ <- orM
518 . intersperse (threadDelay C.luxidJobDeathDelay >> return False)
519 . replicate C.luxidJobDeathDetectionRetries
520 $ cleanupIfDead qstat jid
521 wconfdsocket <- Path.defaultWConfdSocket
522 wconfdclient <- getWConfdClient wconfdsocket
523 void . runResultT $ runRpcClient cleanupLocks wconfdclient
524 return result
525 Bad s -> return . Ok . showJSON $ (False, s)
526
527 handleCall qlock _ cfg (ArchiveJob jid) =
528 -- By adding a layer of MaybeT, we can prematurely end a computation
529 -- using 'mzero' or other 'MonadPlus' primitive and return 'Ok False'.
530 runResultT . liftM (showJSON . fromMaybe False) . runMaybeT $ do
531 qDir <- liftIO queueDir
532 let mcs = Config.getMasterCandidates cfg
533 live = liveJobFile qDir jid
534 archive = archivedJobFile qDir jid
535 withLock qlock $ do
536 (job, _) <- (lift . mkResultT $ loadJobFromDisk qDir False jid)
537 `orElse` mzero
538 guard $ jobFinalized job
539 lift . withErrorT JobQueueError
540 . annotateError "Archiving failed in an unexpected way"
541 . mkResultT $ safeRenameFile queueDirPermissions live archive
542 _ <- liftIO . executeRpcCall mcs
543 $ RpcCallJobqueueRename [(live, archive)]
544 return True
545
546 handleCall qlock _ cfg (AutoArchiveJobs age timeout) = do
547 qDir <- queueDir
548 resultJids <- getJobIDs [qDir]
549 case resultJids of
550 Bad s -> return . Bad . JobQueueError $ show s
551 Ok jids -> do
552 result <- withLock qlock
553 . archiveJobs cfg age timeout
554 $ sortJobIDs jids
555 return . Ok $ showJSON result
556
557 handleCall _ _ _ (PickupJob _) =
558 return . Bad
559 $ GenericError "Luxi call 'PickupJob' is for internal use only"
560
561 {-# ANN handleCall "HLint: ignore Too strict if" #-}
562
563 -- | Special-case handler for WaitForJobChange RPC call for fields == ["status"]
564 -- that doesn't require the use of ConfigData
565 handleWaitForJobChangeStatus :: JobId -> JSValue -> JSValue -> Int
566 -> IO (ErrorResult JSValue)
567 handleWaitForJobChangeStatus jid prev_job prev_log tmout =
568 waitForJobChange jid prev_job tmout $ computeJobUpdateStatus jid prev_log
569
570 -- | Common WaitForJobChange functionality shared between handleCall and
571 -- handleWaitForJobChangeStatus
572 waitForJobChange :: JobId -> JSValue -> Int -> IO (JSValue, JSValue)
573 -> IO (ErrorResult JSValue)
574 waitForJobChange jid prev_job tmout compute_fn = do
575 qDir <- queueDir
576 -- verify if the job is finalized, and return immediately in this case
577 jobresult <- loadJobFromDisk qDir False jid
578 case jobresult of
579 Bad s -> return . Bad $ JobLost s
580 Ok (job, _) | not (jobFinalized job) -> do
581 let jobfile = liveJobFile qDir jid
582 answer <- watchFile jobfile (min tmout C.luxiWfjcTimeout)
583 (prev_job, JSArray []) compute_fn
584 return . Ok $ showJSON answer
585 _ -> liftM (Ok . showJSON) compute_fn
586
587 -- | Query the status of a job and return the requested fields
588 -- and the logs newer than the given log number.
589 computeJobUpdate :: ConfigData -> JobId -> [String] -> JSValue
590 -> IO (JSValue, JSValue)
591 computeJobUpdate cfg jid fields prev_log = do
592 let sjid = show $ fromJobId jid
593 logDebug $ "Inspecting fields " ++ show fields ++ " of job " ++ sjid
594 let fromJSArray (JSArray xs) = xs
595 fromJSArray _ = []
596 let logFilter JSNull (JSArray _) = True
597 logFilter (JSRational _ n) (JSArray (JSRational _ m:_)) = n < m
598 logFilter _ _ = False
599 let filterLogs n logs = JSArray (filter (logFilter n) (logs >>= fromJSArray))
600 jobQuery <- handleClassicQuery cfg (Qlang.ItemTypeLuxi Qlang.QRJob)
601 [Right . fromIntegral $ fromJobId jid] ("oplog" : fields) False
602 let (rfields, rlogs) = case jobQuery of
603 Ok (JSArray [JSArray (JSArray logs : answer)]) ->
604 (answer, filterLogs prev_log logs)
605 _ -> (map (const JSNull) fields, JSArray [])
606 logDebug $ "Updates for job " ++ sjid ++ " are " ++ encode (rfields, rlogs)
607 return (JSArray rfields, rlogs)
608
609 -- | A version of computeJobUpdate hardcoded to only return logs and the status
610 -- field. By hardcoding this we avoid using the luxi Query infrastructure and
611 -- the ConfigData value it requires.
612 computeJobUpdateStatus :: JobId -> JSValue -> IO (JSValue, JSValue)
613 computeJobUpdateStatus jid prev_log = do
614 qdir <- queueDir
615 loadResult <- loadJobFromDisk qdir True jid
616 let sjid = show $ fromJobId jid
617 logDebug $ "Inspecting status of job " ++ sjid
618 let (rfields, rlogs) = case loadResult of
619 Ok (job, _) -> (J.JSArray [status], newlogs)
620 where status = showJSON $ calcJobStatus job -- like "status" jobField
621 oplogs = map qoLog (qjOps job) -- like "oplog" jobField
622 newer = case J.readJSON prev_log of
623 J.Ok n -> (\(idx, _time, _type, _msg) -> n < idx)
624 _ -> const True
625 newlogs = showJSON $ concatMap (filter newer) oplogs
626 _ -> (JSArray[JSNull], JSArray [])
627 logDebug $ "Updates for job " ++ sjid ++ " are " ++ encode (rfields, rlogs)
628 return (rfields, rlogs)
629
630 type LuxiConfig = (Lock, JQStatus, ConfigReader)
631
632 luxiExec
633 :: LuxiConfig
634 -> LuxiOp
635 -> IO (Bool, GenericResult GanetiException JSValue)
636 luxiExec (qlock, qstat, creader) args =
637 case args of
638 -- Special case WaitForJobChange handling to avoid passing a ConfigData to
639 -- a potentially long-lived thread. ConfigData uses lots of heap, and
640 -- multiple handler threads retaining different versions of ConfigData
641 -- increases luxi's memory use for concurrent jobs that modify config.
642 WaitForJobChange jid fields prev_job prev_log tmout
643 | fields == ["status"] -> do
644 result <- handleWaitForJobChangeStatus jid prev_job prev_log tmout
645 return (True, result)
646 _ -> do
647 cfg <- creader
648 result <- handleCallWrapper qlock qstat cfg args
649 return (True, result)
650
651 luxiHandler :: LuxiConfig -> U.Handler LuxiOp IO JSValue
652 luxiHandler cfg = U.Handler { U.hParse = decodeLuxiCall
653 , U.hInputLogShort = strOfOp
654 , U.hInputLogLong = show
655 , U.hExec = luxiExec cfg
656 }
657
658 -- | Type alias for prepMain results
659 type PrepResult = (Server, IORef (Result ConfigData), JQStatus)
660
661 -- | Activate the master IP address.
662 activateMasterIP :: IO (Result ())
663 activateMasterIP = runResultT $ do
664 liftIO $ logDebug "Activating master IP address"
665 conf_file <- liftIO Path.clusterConfFile
666 config <- mkResultT $ Config.loadConfig conf_file
667 let mnp = Config.getMasterNetworkParameters config
668 masters = Config.getMasterNodes config
669 ems = clusterUseExternalMipScript $ configCluster config
670 liftIO . logDebug $ "Master IP params: " ++ show mnp
671 res <- liftIO . executeRpcCall masters $ RpcCallNodeActivateMasterIp mnp ems
672 _ <- liftIO $ logRpcErrors res
673 liftIO $ logDebug "finished activating master IP address"
674 return ()
675
676 -- | Check function for luxid.
677 checkMain :: CheckFn ()
678 checkMain = handleMasterVerificationOptions
679
680 -- | Prepare function for luxid.
681 prepMain :: PrepFn () PrepResult
682 prepMain _ _ = do
683 Exec.isForkSupported
684 >>= flip exitUnless "The daemon must be compiled without -threaded"
685
686 socket_path <- Path.defaultQuerySocket
687 cleanupSocket socket_path
688 s <- describeError "binding to the Luxi socket"
689 Nothing (Just socket_path) $ getLuxiServer True socket_path
690 cref <- newIORef (Bad "Configuration not yet loaded")
691 jq <- emptyJQStatus cref
692 return (s, cref, jq)
693
694 -- | Main function.
695 main :: MainFn () PrepResult
696 main _ _ (server, cref, jq) = do
697 -- Subscribe to config udpates. If the config changes, write new config and
698 -- check if the changes should trigger the scheduler.
699 initConfigReader $ \newConfig -> do
700
701 runScheduler <- atomicModifyIORef cref $ \oldConfig ->
702 case (oldConfig, newConfig) of
703 (Ok old, Ok new) -> (newConfig, configChangeNeedsRescheduling old new)
704 _ -> (newConfig, True) -- no old or new config, schedule
705
706 when runScheduler (updateStatusAndScheduleSomeJobs jq)
707
708 let creader = readIORef cref
709
710 qlockFile <- jobQueueLockFile
711 _ <- lockFile qlockFile >>= exitIfBad "Failed to obtain the job-queue lock"
712 qlock <- newLock
713
714 _ <- P.installHandler P.sigCHLD P.Ignore Nothing
715
716 _ <- forkIO . void $ activateMasterIP
717
718 initJQScheduler jq
719
720 finally
721 (forever $ U.listener (luxiHandler (qlock, jq, creader)) server)
722 (closeServer server >> removeFile qlockFile)