Implement predictive queue cluster parameter
[ganeti-github.git] / src / Ganeti / JQScheduler.hs
1 {-# LANGUAGE RankNTypes #-}
2 {-| Implementation of a reader for the job queue.
3
4 -}
5
6 {-
7
8 Copyright (C) 2013 Google Inc.
9 All rights reserved.
10
11 Redistribution and use in source and binary forms, with or without
12 modification, are permitted provided that the following conditions are
13 met:
14
15 1. Redistributions of source code must retain the above copyright notice,
16 this list of conditions and the following disclaimer.
17
18 2. Redistributions in binary form must reproduce the above copyright
19 notice, this list of conditions and the following disclaimer in the
20 documentation and/or other materials provided with the distribution.
21
22 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
23 IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
24 TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
25 PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
26 CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
27 EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
28 PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
29 PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
30 LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
31 NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
32 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33
34 -}
35
36 module Ganeti.JQScheduler
37 ( JQStatus
38 , jqLivelock
39 , emptyJQStatus
40 , selectJobsToRun
41 , scheduleSomeJobs
42 , initJQScheduler
43 , enqueueNewJobs
44 , dequeueJob
45 , setJobPriority
46 , cleanupIfDead
47 , updateStatusAndScheduleSomeJobs
48 , configChangeNeedsRescheduling
49 ) where
50
51 import Prelude ()
52 import Ganeti.Prelude
53
54 import Control.Applicative (liftA2)
55 import Control.Arrow
56 import Control.Concurrent
57 import Control.Exception
58 import Control.Monad ( when
59 , mfilter
60 , liftM
61 , void
62 , unless
63 , forever
64 , forM_)
65 import Control.Monad.IO.Class
66 import Data.Function (on)
67 import Data.IORef (IORef, atomicModifyIORef, newIORef, readIORef)
68 import Data.List ( find
69 , deleteFirstsBy
70 , sortBy
71 , intercalate
72 , partition
73 , insertBy)
74 import Data.Maybe
75 import qualified Data.Map as Map
76 import Data.Ord (comparing)
77 import Data.Set (Set)
78 import qualified Data.Set as S
79 import System.INotify
80
81 import Ganeti.BasicTypes
82 import Ganeti.Compat
83 import Ganeti.Constants as C
84 import Ganeti.Errors
85 import Ganeti.JQScheduler.Filtering (applyingFilter, jobFiltering)
86 import Ganeti.JQScheduler.Types
87 import Ganeti.JQScheduler.ReasonRateLimiting (reasonRateLimit)
88 import Ganeti.JQueue as JQ
89 import Ganeti.JQueue.LockDecls
90 import Ganeti.JSON (fromContainer)
91 import Ganeti.Lens hiding (chosen)
92 import Ganeti.Logging
93 import Ganeti.Objects
94 import Ganeti.OpCodes
95 import Ganeti.Path
96 import Ganeti.Query.Exec (forkPostHooksProcess)
97 import Ganeti.Types
98 import Ganeti.Utils
99 import Ganeti.Utils.Livelock
100 import Ganeti.Utils.MVarLock
101
102
103 {-| Representation of the job queue
104
105 We keep two lists of jobs (together with information about the last
106 fstat result observed): the jobs that are enqueued, but not yet handed
107 over for execution, and the jobs already handed over for execution. They
108 are kept together in a single IORef, so that we can atomically update
109 both, in particular when scheduling jobs to be handed over for execution.
110
111 -}
112
113 data JQStatus = JQStatus
114 { jqJobs :: IORef Queue
115 , jqConfig :: IORef (Result ConfigData)
116 , jqLivelock :: Livelock
117 , jqForkLock :: Lock
118 }
119
120
121 emptyJQStatus :: IORef (Result ConfigData) -> IO JQStatus
122 emptyJQStatus config = do
123 jqJ <- newIORef Queue { qEnqueued = [], qRunning = [], qManipulated = [] }
124 (_, livelock) <- mkLivelockFile C.luxiLivelockPrefix
125 forkLock <- newLock
126 return JQStatus { jqJobs = jqJ, jqConfig = config, jqLivelock = livelock
127 , jqForkLock = forkLock }
128
129 -- When updating the job lists, force the elements to WHNF, otherwise it is
130 -- easy to leak the resources held onto by the lazily parsed job file.
131 -- This can happen, eg, if updateJob is called, but the resulting QueuedJob
132 -- isn't used by the scheduler, for example when the inotify watcher or the
133 -- the polling loop re-reads a job with a new message appended to it.
134
135 -- | Apply a function on the running jobs.
136 onRunningJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> Queue
137 onRunningJobs f q@Queue { qRunning = qr } =
138 let qr' = (foldr seq () qr) `seq` f qr -- force list els to WHNF
139 in q { qRunning = qr' }
140
141 -- | Apply a function on the queued jobs.
142 onQueuedJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> Queue
143 onQueuedJobs f q@Queue { qEnqueued = qe } =
144 let qe' = (foldr seq () qe) `seq` f qe -- force list els to WHNF
145 in q { qEnqueued = qe' }
146
147 -- | Obtain a JobWithStat from a QueuedJob.
148 unreadJob :: QueuedJob -> JobWithStat
149 unreadJob job = JobWithStat {jJob=job, jStat=nullFStat, jINotify=Nothing}
150
151 -- | Read a cluster parameter from the configuration, using a default if the
152 -- configuration is not available.
153 getConfigValue :: (Cluster -> a) -> a -> JQStatus -> IO a
154 getConfigValue param defaultvalue =
155 liftM (genericResult (const defaultvalue) (param . configCluster))
156 . readIORef . jqConfig
157
158 -- | Get the maximual number of jobs to be run simultaneously from the
159 -- configuration. If the configuration is not available, be conservative
160 -- and use the smallest possible value, i.e., 1.
161 getMaxRunningJobs :: JQStatus -> IO Int
162 getMaxRunningJobs = getConfigValue clusterMaxRunningJobs 1
163
164 -- | Get the maximual number of jobs to be tracked simultaneously from the
165 -- configuration. If the configuration is not available, be conservative
166 -- and use the smallest possible value, i.e., 1.
167 getMaxTrackedJobs :: JQStatus -> IO Int
168 getMaxTrackedJobs = getConfigValue clusterMaxTrackedJobs 1
169
170 -- | Get the boolean that specifies whether or not the predictive queue
171 -- scheduler is enabled in the cluster. If the configuration is not available,
172 -- the predictive queue is enabled by default.
173 getEnabledPredictiveQueue :: JQStatus -> IO Bool
174 getEnabledPredictiveQueue = getConfigValue clusterEnabledPredictiveQueue True
175
176 -- | Get the number of jobs currently running.
177 getRQL :: JQStatus -> IO Int
178 getRQL = liftM (length . qRunning) . readIORef . jqJobs
179
180 -- | Wrapper function to atomically update the jobs in the queue status.
181 modifyJobs :: JQStatus -> (Queue -> Queue) -> IO ()
182 modifyJobs qstat f = atomicModifyIORef' (jqJobs qstat) (flip (,) () . f)
183
184 -- | Reread a job from disk, if the file has changed.
185 readJobStatus :: JobWithStat -> IO (Maybe JobWithStat)
186 readJobStatus jWS@(JobWithStat {jStat=fstat, jJob=job}) = do
187 let jid = qjId job
188 qdir <- queueDir
189 let fpath = liveJobFile qdir jid
190 logDebug $ "Checking if " ++ fpath ++ " changed on disk."
191 changedResult <- try $ needsReload fstat fpath
192 :: IO (Either IOError (Maybe FStat))
193 let changed = either (const $ Just nullFStat) id changedResult
194 case changed of
195 Nothing -> do
196 logDebug $ "File " ++ fpath ++ " not changed on disk."
197 return Nothing
198 Just fstat' -> do
199 let jids = show $ fromJobId jid
200 logDebug $ "Rereading job " ++ jids
201 readResult <- loadJobFromDisk qdir True jid
202 case readResult of
203 Bad s -> do
204 logWarning $ "Failed to read job " ++ jids ++ ": " ++ s
205 return Nothing
206 Ok (job', _) -> do
207 logDebug $ "Read job " ++ jids ++ ", status is "
208 ++ show (calcJobStatus job')
209 return . Just $ jWS {jStat=fstat', jJob=job'}
210 -- jINotify unchanged
211
212 -- | Update a job in the job queue, if it is still there. This is the
213 -- pure function for inserting a previously read change into the queue.
214 -- as the change contains its time stamp, we don't have to worry about a
215 -- later read change overwriting a newer read state. If this happens, the
216 -- fstat value will be outdated, so the next poller run will fix this.
217 updateJobStatus :: JobWithStat -> [JobWithStat] -> [JobWithStat]
218 updateJobStatus job' =
219 let jid = qjId $ jJob job' in
220 map (\job -> if qjId (jJob job) == jid then job' else job)
221
222 -- | Update a single job by reading it from disk, if necessary.
223 updateJob :: JQStatus -> JobWithStat -> IO ()
224 updateJob state jb = do
225 jb' <- readJobStatus jb
226 maybe (return ()) (modifyJobs state . onRunningJobs . updateJobStatus) jb'
227 when (maybe True (jobFinalized . jJob) jb') . (>> return ()) . forkIO $ do
228 logDebug "Scheduler noticed a job to have finished."
229 cleanupFinishedJobs state
230 scheduleSomeJobs state
231
232 -- | Move a job from one part of the queue to another.
233 -- Return the job that was moved, or 'Nothing' if it wasn't found in
234 -- the queue.
235 moveJob :: Lens' Queue [JobWithStat] -- ^ from queue
236 -> Lens' Queue [JobWithStat] -- ^ to queue
237 -> JobId
238 -> Queue
239 -> (Queue, Maybe JobWithStat)
240 moveJob fromQ toQ jid queue =
241 -- traverse over the @(,) [JobWithStats]@ functor to extract the job
242 case traverseOf fromQ (partition ((== jid) . qjId . jJob)) queue of
243 (job : _, queue') -> (over toQ (++ [job]) queue', Just job)
244 _ -> (queue, Nothing)
245
246 -- | Atomically move a job from one part of the queue to another.
247 -- Return the job that was moved, or 'Nothing' if it wasn't found in
248 -- the queue.
249 moveJobAtomic :: Lens' Queue [JobWithStat] -- ^ from queue
250 -> Lens' Queue [JobWithStat] -- ^ to queue
251 -> JobId
252 -> JQStatus
253 -> IO (Maybe JobWithStat)
254 moveJobAtomic fromQ toQ jid qstat =
255 atomicModifyIORef (jqJobs qstat) (moveJob fromQ toQ jid)
256
257 -- | Manipulate a running job by atomically moving it from 'qRunning'
258 -- into 'qManipulated', running a given IO action and then atomically
259 -- returning it back.
260 --
261 -- Returns the result of the IO action, or 'Nothing', if the job wasn't found
262 -- in the queue.
263 manipulateRunningJob :: JQStatus -> JobId -> IO a -> IO (Maybe a)
264 manipulateRunningJob qstat jid k = do
265 jobOpt <- moveJobAtomic qRunningL qManipulatedL jid qstat
266 case jobOpt of
267 Nothing -> return Nothing
268 Just _ -> (Just `liftM` k)
269 `finally` moveJobAtomic qManipulatedL qRunningL jid qstat
270
271 -- | Sort out the finished jobs from the monitored part of the queue.
272 -- This is the pure part, splitting the queue into a remaining queue
273 -- and the jobs that were removed.
274 sortoutFinishedJobs :: Queue -> (Queue, [JobWithStat])
275 sortoutFinishedJobs queue =
276 let (fin, run') = partition (jobFinalized . jJob) . qRunning $ queue
277 in (queue {qRunning=run'}, fin)
278
279 -- | Actually clean up the finished jobs. This is the IO wrapper around
280 -- the pure `sortoutFinishedJobs`.
281 cleanupFinishedJobs :: JQStatus -> IO ()
282 cleanupFinishedJobs qstate = do
283 finished <- atomicModifyIORef (jqJobs qstate) sortoutFinishedJobs
284 let showJob = show . ((fromJobId . qjId) &&& calcJobStatus) . jJob
285 jlist = commaJoin $ map showJob finished
286 unless (null finished)
287 . logInfo $ "Finished jobs: " ++ jlist
288 mapM_ (maybe (return ()) killINotify . jINotify) finished
289
290 -- | Watcher task for a job, to update it on file changes. It also
291 -- reinstantiates itself upon receiving an Ignored event.
292 jobWatcher :: JQStatus -> JobWithStat -> Event -> IO ()
293 jobWatcher state jWS e = do
294 let jid = qjId $ jJob jWS
295 jids = show $ fromJobId jid
296 logInfo $ "Scheduler notified of change of job " ++ jids
297 logDebug $ "Scheduler notify event for " ++ jids ++ ": " ++ show e
298 let inotify = jINotify jWS
299 when (e == Ignored && isJust inotify) $ do
300 qdir <- queueDir
301 let fpath = liveJobFile qdir jid
302 _ <- addWatch (fromJust inotify) [Modify, Delete] fpath
303 (jobWatcher state jWS)
304 return ()
305 updateJob state jWS
306
307 -- | Attach the job watcher to a running job.
308 attachWatcher :: JQStatus -> JobWithStat -> IO ()
309 attachWatcher state jWS = when (isNothing $ jINotify jWS) $ do
310 max_watch <- getMaxTrackedJobs state
311 rql <- getRQL state
312 if rql < max_watch
313 then do
314 inotify <- initINotify
315 qdir <- queueDir
316 let fpath = liveJobFile qdir . qjId $ jJob jWS
317 jWS' = jWS { jINotify=Just inotify }
318 logDebug $ "Attaching queue watcher for " ++ fpath
319 _ <- addWatch inotify [Modify, Delete] fpath $ jobWatcher state jWS'
320 modifyJobs state . onRunningJobs $ updateJobStatus jWS'
321 else logDebug $ "Not attaching watcher for job "
322 ++ (show . fromJobId . qjId $ jJob jWS)
323 ++ ", run queue length is " ++ show rql
324
325 -- | For a queued job, determine whether it is eligible to run, i.e.,
326 -- if no jobs it depends on are either enqueued or running.
327 jobEligible :: Queue -> JobWithStat -> Bool
328 jobEligible queue jWS =
329 let jdeps = getJobDependencies $ jJob jWS
330 blocks = flip elem jdeps . qjId . jJob
331 in not . any blocks . liftA2 (++) qRunning qEnqueued $ queue
332
333 extractFirstOpCode :: JobWithStat -> Maybe OpCode
334 extractFirstOpCode job =
335 let qop = listToMaybe . qjOps . jJob $ job
336 metaOps = maybe [] (JQ.toMetaOpCode . qoInput) qop
337 in (fmap metaOpCode) . listToMaybe $ metaOps
338
339 -- | Sort the given job queue by its static lock weight in relation to the
340 -- currently running jobs.
341 sortByStaticLocks :: ConfigData
342 -> Queue
343 -> Timestamp -- Current time
344 -> [JobWithStat]
345 -> [JobWithStat]
346 sortByStaticLocks cfg queue currTime = sortBy (compare `on` opWeight)
347 where opWeight :: JobWithStat -> Double
348 opWeight job = adjustedWeight currTime (recvTime job)
349 . staticWeight cfg (extractFirstOpCode job) $ runningOps
350 recvTime = fromMaybe noTimestamp . qjReceivedTimestamp . jJob
351 runningOps = catMaybes . (fmap extractFirstOpCode) . qRunning $ queue
352
353 -- | Decide on which jobs to schedule next for execution. This is the
354 -- pure function doing the scheduling.
355 selectJobsToRun :: ConfigData
356 -> Int -- How many jobs are allowed to run at the same time.
357 -> Bool -- If the predictive scheduler is enabled
358 -> Timestamp -- Current time
359 -> Set FilterRule -- Filter rules to respect for scheduling
360 -> Queue
361 -> (Queue, [JobWithStat])
362 selectJobsToRun cfg count isPredictive currTime filters queue =
363 let n = count - length (qRunning queue) - length (qManipulated queue)
364 pickScheduler = if isPredictive
365 then sortByStaticLocks cfg queue currTime
366 else id
367 chosen = take n
368 . jobFiltering queue filters
369 . reasonRateLimit queue
370 . sortBy (comparing (calcJobPriority . jJob))
371 . filter (jobEligible queue)
372 . pickScheduler
373 $ qEnqueued queue
374 remain = deleteFirstsBy ((==) `on` (qjId . jJob)) (qEnqueued queue) chosen
375 in (queue {qEnqueued=remain, qRunning=qRunning queue ++ chosen}, chosen)
376
377 -- | Logs errors of failed jobs and returns the set of job IDs.
378 logFailedJobs :: (MonadLog m)
379 => [(JobWithStat, GanetiException)] -> m (S.Set JobId)
380 logFailedJobs [] = return S.empty
381 logFailedJobs jobs = do
382 let jids = S.fromList . map (qjId . jJob . fst) $ jobs
383 jidsString = commaJoin . map (show . fromJobId) . S.toList $ jids
384 logWarning $ "Starting jobs " ++ jidsString ++ " failed: "
385 ++ show (map snd jobs)
386 return jids
387
388 -- | Fail jobs that were previously selected for execution
389 -- but couldn't be started.
390 failJobs :: ConfigData -> JQStatus -> [(JobWithStat, GanetiException)]
391 -> IO ()
392 failJobs cfg qstate jobs = do
393 qdir <- queueDir
394 now <- currentTimestamp
395 jids <- logFailedJobs jobs
396 let sjobs = intercalate "." . map (show . fromJobId) $ S.toList jids
397 let rmJobs = filter ((`S.notMember` jids) . qjId . jJob)
398 logWarning $ "Failing jobs " ++ sjobs
399 modifyJobs qstate $ onRunningJobs rmJobs
400 let trySaveJob :: JobWithStat -> ResultT String IO ()
401 trySaveJob = (() <$) . writeAndReplicateJob cfg qdir . jJob
402 reason jid msg =
403 ( "gnt:daemon:luxid:startjobs"
404 , "job " ++ show (fromJobId jid) ++ " failed to start: " ++ msg
405 , reasonTrailTimestamp now )
406 failJob err job = failQueuedJob (reason (qjId job) (show err)) now job
407 failAndSaveJobWithStat (jws, err) =
408 trySaveJob . over jJobL (failJob err) $ jws
409 mapM_ (runResultT . failAndSaveJobWithStat) jobs
410 logDebug $ "Failed jobs " ++ sjobs
411
412
413 -- | Checks if any jobs match a REJECT filter rule, and cancels them.
414 cancelRejectedJobs :: JQStatus -> ConfigData -> Set FilterRule -> IO ()
415 cancelRejectedJobs qstate cfg filters = do
416
417 enqueuedJobs <- map jJob . qEnqueued <$> readIORef (jqJobs qstate)
418
419 -- Determine which jobs are rejected.
420 let jobsToCancel =
421 [ (job, fr) | job <- enqueuedJobs
422 , Just fr <- [applyingFilter filters job]
423 , frAction fr == Reject ]
424
425 -- Cancel them.
426 qDir <- queueDir
427 forM_ jobsToCancel $ \(job, fr) -> do
428 let jid = qjId job
429 logDebug $ "Cancelling job " ++ show (fromJobId jid)
430 ++ " because it was REJECTed by filter rule " ++ uuidOf fr
431 -- First dequeue, then cancel.
432 dequeueResult <- dequeueJob qstate jid
433 case dequeueResult of
434 Ok True -> do
435 now <- currentTimestamp
436 r <- runResultT
437 $ writeAndReplicateJob cfg qDir (cancelQueuedJob now job)
438 case r of
439 Ok _ -> return ()
440 Bad err -> logError $
441 "Failed to write config when cancelling job: " ++ err
442 Ok False -> do
443 logDebug $ "Job " ++ show (fromJobId jid)
444 ++ " not queued; trying to cancel directly"
445 _ <- cancelJob False (jqLivelock qstate) jid -- sigTERM-kill only
446 return ()
447 Bad s -> logError s -- passing a nonexistent job ID is an error here
448
449
450 -- | Schedule jobs to be run. This is the IO wrapper around the
451 -- pure `selectJobsToRun`.
452 scheduleSomeJobs :: JQStatus -> IO ()
453 scheduleSomeJobs qstate = do
454 cfgR <- readIORef (jqConfig qstate)
455 case cfgR of
456 Bad err -> do
457 let msg = "Configuration unavailable: " ++ err
458 logError msg
459 Ok cfg -> do
460 let filters = S.fromList . Map.elems . fromContainer $ configFilters cfg
461
462 -- Check if jobs are rejected by a REJECT filter, and cancel them.
463 cancelRejectedJobs qstate cfg filters
464
465 ts <- currentTimestamp
466
467 -- Select the jobs to run.
468 count <- getMaxRunningJobs qstate
469 isPredictive <- getEnabledPredictiveQueue qstate
470 let jobsToRun = selectJobsToRun cfg count isPredictive ts filters
471 chosen <- atomicModifyIORef (jqJobs qstate) jobsToRun
472
473 let jobs = map jJob chosen
474 unless (null chosen) . logInfo . (++) "Starting jobs: " . commaJoin
475 $ map (show . fromJobId . qjId) jobs
476
477 -- Attach the watcher.
478 mapM_ (attachWatcher qstate) chosen
479
480 -- Start the jobs.
481 result <- JQ.startJobs (jqLivelock qstate) (jqForkLock qstate) jobs
482 let badWith (x, Bad y) = Just (x, y)
483 badWith _ = Nothing
484 let failed = mapMaybe badWith $ zip chosen result
485 unless (null failed) $ failJobs cfg qstate failed
486
487 -- | Format the job queue status in a compact, human readable way.
488 showQueue :: Queue -> String
489 showQueue (Queue {qEnqueued=waiting, qRunning=running}) =
490 let showids = show . map (fromJobId . qjId . jJob)
491 in "Waiting jobs: " ++ showids waiting
492 ++ "; running jobs: " ++ showids running
493
494 -- | Check if a job died, and clean up if so. Return True, if
495 -- the job was found dead.
496 checkForDeath :: JQStatus -> JobWithStat -> IO Bool
497 checkForDeath state jobWS = do
498 let job = jJob jobWS
499 jid = qjId job
500 sjid = show $ fromJobId jid
501 livelock = qjLivelock job
502 logDebug $ "Livelock of job " ++ sjid ++ " is " ++ show livelock
503 died <- maybe (return False) isDead
504 . mfilter (/= jqLivelock state)
505 $ livelock
506 logDebug $ "Death of " ++ sjid ++ ": " ++ show died
507 when died $ do
508 logInfo $ "Detected death of job " ++ sjid
509 -- if we manage to remove the job from the queue, we own the job file
510 -- and can manipulate it.
511 void . manipulateRunningJob state jid . runResultT $ do
512 jobWS' <- mkResultT $ readJobFromDisk jid :: ResultG JobWithStat
513 unless (jobFinalized . jJob $ jobWS') . void $ do
514 -- If the job isn't finalized, but dead, add a corresponding
515 -- failed status.
516 now <- liftIO currentTimestamp
517 qDir <- liftIO queueDir
518 let reason = ( "gnt:daemon:luxid:deathdetection"
519 , "detected death of job " ++ sjid
520 , reasonTrailTimestamp now )
521 failedJob = failQueuedJob reason now $ jJob jobWS'
522 cfg <- mkResultT . readIORef $ jqConfig state
523 writeAndReplicateJob cfg qDir failedJob
524 return died
525
526 -- | Trigger job detection for the job with the given job id.
527 -- If the job is dead, start post hooks execution process and return True
528 cleanupIfDead :: JQStatus -> JobId -> IO Bool
529 cleanupIfDead state jid = do
530 logDebug $ "Extra job-death detection for " ++ show (fromJobId jid)
531 jobs <- readIORef (jqJobs state)
532 let jobWS = find ((==) jid . qjId . jJob) $ qRunning jobs
533 -- and run the post hooks
534 let runHooks = do
535 r <- runResultT . withLock (jqForkLock state)
536 $ forkPostHooksProcess jid
537 let sjid = show $ fromJobId jid
538 logDebug $ genericResult ((++) $ "Error starting post hooks process "
539 ++ "for disappeared job "
540 ++ sjid ++ ":")
541 (\pid -> "Post hooks for disappeared job "
542 ++ sjid ++ "have started in "
543 ++ show pid)
544 r
545 dead <- maybe (return True) (checkForDeath state) jobWS
546 if dead
547 then runHooks
548 else pure ()
549 return dead
550
551 -- | Force the queue to check the state of all jobs.
552 updateStatusAndScheduleSomeJobs :: JQStatus -> IO ()
553 updateStatusAndScheduleSomeJobs qstate = do
554 jobs <- readIORef (jqJobs qstate)
555 mapM_ (checkForDeath qstate) $ qRunning jobs
556 jobs' <- readIORef (jqJobs qstate)
557 mapM_ (updateJob qstate) $ qRunning jobs'
558 cleanupFinishedJobs qstate
559 jobs'' <- readIORef (jqJobs qstate)
560 logInfo $ showQueue jobs''
561 scheduleSomeJobs qstate
562
563 -- | Time-based watcher for updating the job queue.
564 onTimeWatcher :: JQStatus -> IO ()
565 onTimeWatcher qstate = forever $ do
566 threadDelaySeconds C.luxidJobqueuePollInterval
567 logDebug "Job queue watcher timer fired"
568 updateStatusAndScheduleSomeJobs qstate
569 logDebug "Job queue watcher cycle finished"
570
571 -- | Read a single, non-archived, job, specified by its id, from disk.
572 readJobFromDisk :: JobId -> IO (Result JobWithStat)
573 readJobFromDisk jid = do
574 qdir <- queueDir
575 let fpath = liveJobFile qdir jid
576 logDebug $ "Reading " ++ fpath
577 tryFstat <- try $ getFStat fpath :: IO (Either IOError FStat)
578 let fstat = either (const nullFStat) id tryFstat
579 loadResult <- JQ.loadJobFromDisk qdir False jid
580 return $ liftM (JobWithStat Nothing fstat . fst) loadResult
581
582 -- | Read all non-finalized jobs from disk.
583 readJobsFromDisk :: IO [JobWithStat]
584 readJobsFromDisk = do
585 logInfo "Loading job queue"
586 qdir <- queueDir
587 eitherJids <- JQ.getJobIDs [qdir]
588 let jids = genericResult (const []) JQ.sortJobIDs eitherJids
589 jidsstring = commaJoin $ map (show . fromJobId) jids
590 logInfo $ "Non-archived jobs on disk: " ++ jidsstring
591 jobs <- mapM readJobFromDisk jids
592 return $ justOk jobs
593
594 -- | Set up the job scheduler. This will also start the monitoring
595 -- of changes to the running jobs.
596 initJQScheduler :: JQStatus -> IO ()
597 initJQScheduler qstate = do
598 alljobs <- readJobsFromDisk
599 let jobs = filter (not . jobFinalized . jJob) alljobs
600 (running, queued) = partition (jobStarted . jJob) jobs
601 modifyJobs qstate (onQueuedJobs (++ queued) . onRunningJobs (++ running))
602 jqjobs <- readIORef (jqJobs qstate)
603 logInfo $ showQueue jqjobs
604 scheduleSomeJobs qstate
605 logInfo "Starting time-based job queue watcher"
606 _ <- forkIO $ onTimeWatcher qstate
607 return ()
608
609 -- | Enqueue new jobs. This will guarantee that the jobs will be executed
610 -- eventually.
611 enqueueNewJobs :: JQStatus -> [QueuedJob] -> IO ()
612 enqueueNewJobs state jobs = do
613 logInfo . (++) "New jobs enqueued: " . commaJoin
614 $ map (show . fromJobId . qjId) jobs
615 let jobs' = map unreadJob jobs
616 insertFn = insertBy (compare `on` fromJobId . qjId . jJob)
617 addJobs oldjobs = foldl (flip insertFn) oldjobs jobs'
618 modifyJobs state (onQueuedJobs addJobs)
619 scheduleSomeJobs state
620
621 -- | Pure function for removing a queued job from the job queue by
622 -- atomicModifyIORef. The answer is Just the job if the job could be removed
623 -- before being handed over to execution, Nothing if it already was started
624 -- and a Bad result if the job is not found in the queue.
625 rmJob :: JobId -> Queue -> (Queue, Result (Maybe QueuedJob))
626 rmJob jid q =
627 let isJid = (jid ==) . qjId . jJob
628 (found, queued') = partition isJid $ qEnqueued q
629 isRunning = any isJid $ qRunning q
630 sJid = (++) "Job " . show $ fromJobId jid
631 in case (found, isRunning) of
632 ([job], _) -> (q {qEnqueued = queued'}, Ok . Just $ jJob job)
633 (_:_, _) -> (q, Bad $ "Queue in inconsistent state."
634 ++ sJid ++ " queued multiple times")
635 (_, True) -> (q, Ok Nothing)
636 _ -> (q, Bad $ sJid ++ " not found in queue")
637
638 -- | Try to remove a queued job from the job queue. Return True, if
639 -- the job could be removed from the queue before being handed over
640 -- to execution, False if the job already started, and a Bad result
641 -- if the job is unknown.
642 dequeueJob :: JQStatus -> JobId -> IO (Result Bool)
643 dequeueJob state jid = do
644 result <- atomicModifyIORef (jqJobs state) $ rmJob jid
645 let result' = fmap isJust result
646 logDebug $ "Result of dequeing job " ++ show (fromJobId jid)
647 ++ " is " ++ show result'
648 return result'
649
650 -- | Change the priority of a queued job (once the job is handed over
651 -- to execution, the job itself needs to be informed). To avoid the
652 -- job being started unmodified, it is temporarily unqueued during the
653 -- change. Return the modified job, if the job's priority was sucessfully
654 -- modified, Nothing, if the job already started, and a Bad value, if the job
655 -- is unkown.
656 setJobPriority :: JQStatus -> JobId -> Int -> IO (Result (Maybe QueuedJob))
657 setJobPriority state jid prio = runResultT $ do
658 maybeJob <- mkResultT . atomicModifyIORef (jqJobs state) $ rmJob jid
659 case maybeJob of
660 Nothing -> return Nothing
661 Just job -> do
662 let job' = changeJobPriority prio job
663 qDir <- liftIO queueDir
664 mkResultT $ writeJobToDisk qDir job'
665 liftIO $ enqueueNewJobs state [job']
666 return $ Just job'
667
668
669 -- | Given old and new configs, determines if the changes between them should
670 -- trigger the scheduler to run.
671 configChangeNeedsRescheduling :: ConfigData -> ConfigData -> Bool
672 configChangeNeedsRescheduling old new =
673 -- Trigger rescheduling if any of the following change:
674 (((/=) `on` configFilters) old new || -- filters
675 ((/=) `on` clusterMaxRunningJobs . configCluster) old new -- run queue length
676 )