Prefer the UuidObject type class over specific functions
[ganeti-github.git] / src / Ganeti / JQScheduler.hs
1 {-# LANGUAGE RankNTypes #-}
2 {-| Implementation of a reader for the job queue.
3
4 -}
5
6 {-
7
8 Copyright (C) 2013 Google Inc.
9 All rights reserved.
10
11 Redistribution and use in source and binary forms, with or without
12 modification, are permitted provided that the following conditions are
13 met:
14
15 1. Redistributions of source code must retain the above copyright notice,
16 this list of conditions and the following disclaimer.
17
18 2. Redistributions in binary form must reproduce the above copyright
19 notice, this list of conditions and the following disclaimer in the
20 documentation and/or other materials provided with the distribution.
21
22 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
23 IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
24 TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
25 PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
26 CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
27 EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
28 PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
29 PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
30 LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
31 NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
32 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33
34 -}
35
36 module Ganeti.JQScheduler
37 ( JQStatus
38 , jqLivelock
39 , jqForkLock
40 , emptyJQStatus
41 , selectJobsToRun
42 , scheduleSomeJobs
43 , initJQScheduler
44 , enqueueNewJobs
45 , dequeueJob
46 , setJobPriority
47 , cleanupIfDead
48 , configChangeNeedsRescheduling
49 ) where
50
51 import Control.Applicative (liftA2, (<$>))
52 import Control.Arrow
53 import Control.Concurrent
54 import Control.Exception
55 import Control.Monad
56 import Control.Monad.IO.Class
57 import Data.Function (on)
58 import Data.Functor ((<$))
59 import Data.IORef
60 import Data.List
61 import Data.Maybe
62 import qualified Data.Map as Map
63 import Data.Ord (comparing)
64 import Data.Set (Set)
65 import qualified Data.Set as S
66 import System.INotify
67
68 import Ganeti.BasicTypes
69 import Ganeti.Constants as C
70 import Ganeti.Errors
71 import Ganeti.JQScheduler.Filtering (applyingFilter, jobFiltering)
72 import Ganeti.JQScheduler.Types
73 import Ganeti.JQScheduler.ReasonRateLimiting (reasonRateLimit)
74 import Ganeti.JQueue as JQ
75 import Ganeti.JSON (fromContainer)
76 import Ganeti.Lens hiding (chosen)
77 import Ganeti.Logging
78 import Ganeti.Objects
79 import Ganeti.Path
80 import Ganeti.Types
81 import Ganeti.Utils
82 import Ganeti.Utils.Livelock
83 import Ganeti.Utils.MVarLock
84
85
86 {-| Representation of the job queue
87
88 We keep two lists of jobs (together with information about the last
89 fstat result observed): the jobs that are enqueued, but not yet handed
90 over for execution, and the jobs already handed over for execution. They
91 are kept together in a single IORef, so that we can atomically update
92 both, in particular when scheduling jobs to be handed over for execution.
93
94 -}
95
96 data JQStatus = JQStatus
97 { jqJobs :: IORef Queue
98 , jqConfig :: IORef (Result ConfigData)
99 , jqLivelock :: Livelock
100 , jqForkLock :: Lock
101 }
102
103
104 emptyJQStatus :: IORef (Result ConfigData) -> IO JQStatus
105 emptyJQStatus config = do
106 jqJ <- newIORef Queue { qEnqueued = [], qRunning = [], qManipulated = [] }
107 (_, livelock) <- mkLivelockFile C.luxiLivelockPrefix
108 forkLock <- newLock
109 return JQStatus { jqJobs = jqJ, jqConfig = config, jqLivelock = livelock
110 , jqForkLock = forkLock }
111
112 -- | Apply a function on the running jobs.
113 onRunningJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> Queue
114 onRunningJobs = over qRunningL
115
116 -- | Apply a function on the queued jobs.
117 onQueuedJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> Queue
118 onQueuedJobs = over qEnqueuedL
119
120 -- | Obtain a JobWithStat from a QueuedJob.
121 unreadJob :: QueuedJob -> JobWithStat
122 unreadJob job = JobWithStat {jJob=job, jStat=nullFStat, jINotify=Nothing}
123
124 -- | Reload interval for polling the running jobs for updates in microseconds.
125 watchInterval :: Int
126 watchInterval = C.luxidJobqueuePollInterval * 1000000
127
128 -- | Read a cluster parameter from the configuration, using a default if the
129 -- configuration is not available.
130 getConfigValue :: (Cluster -> a) -> a -> JQStatus -> IO a
131 getConfigValue param defaultvalue =
132 liftM (genericResult (const defaultvalue) (param . configCluster))
133 . readIORef . jqConfig
134
135 -- | Get the maximual number of jobs to be run simultaneously from the
136 -- configuration. If the configuration is not available, be conservative
137 -- and use the smallest possible value, i.e., 1.
138 getMaxRunningJobs :: JQStatus -> IO Int
139 getMaxRunningJobs = getConfigValue clusterMaxRunningJobs 1
140
141 -- | Get the maximual number of jobs to be tracked simultaneously from the
142 -- configuration. If the configuration is not available, be conservative
143 -- and use the smallest possible value, i.e., 1.
144 getMaxTrackedJobs :: JQStatus -> IO Int
145 getMaxTrackedJobs = getConfigValue clusterMaxTrackedJobs 1
146
147 -- | Get the number of jobs currently running.
148 getRQL :: JQStatus -> IO Int
149 getRQL = liftM (length . qRunning) . readIORef . jqJobs
150
151 -- | Wrapper function to atomically update the jobs in the queue status.
152 modifyJobs :: JQStatus -> (Queue -> Queue) -> IO ()
153 modifyJobs qstat f = atomicModifyIORef (jqJobs qstat) (flip (,) () . f)
154
155 -- | Reread a job from disk, if the file has changed.
156 readJobStatus :: JobWithStat -> IO (Maybe JobWithStat)
157 readJobStatus jWS@(JobWithStat {jStat=fstat, jJob=job}) = do
158 let jid = qjId job
159 qdir <- queueDir
160 let fpath = liveJobFile qdir jid
161 logDebug $ "Checking if " ++ fpath ++ " changed on disk."
162 changedResult <- try $ needsReload fstat fpath
163 :: IO (Either IOError (Maybe FStat))
164 let changed = either (const $ Just nullFStat) id changedResult
165 case changed of
166 Nothing -> do
167 logDebug $ "File " ++ fpath ++ " not changed on disk."
168 return Nothing
169 Just fstat' -> do
170 let jids = show $ fromJobId jid
171 logDebug $ "Rereading job " ++ jids
172 readResult <- loadJobFromDisk qdir True jid
173 case readResult of
174 Bad s -> do
175 logWarning $ "Failed to read job " ++ jids ++ ": " ++ s
176 return Nothing
177 Ok (job', _) -> do
178 logDebug $ "Read job " ++ jids ++ ", status is "
179 ++ show (calcJobStatus job')
180 return . Just $ jWS {jStat=fstat', jJob=job'}
181 -- jINotify unchanged
182
183 -- | Update a job in the job queue, if it is still there. This is the
184 -- pure function for inserting a previously read change into the queue.
185 -- as the change contains its time stamp, we don't have to worry about a
186 -- later read change overwriting a newer read state. If this happens, the
187 -- fstat value will be outdated, so the next poller run will fix this.
188 updateJobStatus :: JobWithStat -> [JobWithStat] -> [JobWithStat]
189 updateJobStatus job' =
190 let jid = qjId $ jJob job' in
191 map (\job -> if qjId (jJob job) == jid then job' else job)
192
193 -- | Update a single job by reading it from disk, if necessary.
194 updateJob :: JQStatus -> JobWithStat -> IO ()
195 updateJob state jb = do
196 jb' <- readJobStatus jb
197 maybe (return ()) (modifyJobs state . onRunningJobs . updateJobStatus) jb'
198 when (maybe True (jobFinalized . jJob) jb') . (>> return ()) . forkIO $ do
199 logDebug "Scheduler noticed a job to have finished."
200 cleanupFinishedJobs state
201 scheduleSomeJobs state
202
203 -- | Move a job from one part of the queue to another.
204 -- Return the job that was moved, or 'Nothing' if it wasn't found in
205 -- the queue.
206 moveJob :: Lens' Queue [JobWithStat] -- ^ from queue
207 -> Lens' Queue [JobWithStat] -- ^ to queue
208 -> JobId
209 -> Queue
210 -> (Queue, Maybe JobWithStat)
211 moveJob fromQ toQ jid queue =
212 -- traverse over the @(,) [JobWithStats]@ functor to extract the job
213 case traverseOf fromQ (partition ((== jid) . qjId . jJob)) queue of
214 (job : _, queue') -> (over toQ (++ [job]) queue', Just job)
215 _ -> (queue, Nothing)
216
217 -- | Atomically move a job from one part of the queue to another.
218 -- Return the job that was moved, or 'Nothing' if it wasn't found in
219 -- the queue.
220 moveJobAtomic :: Lens' Queue [JobWithStat] -- ^ from queue
221 -> Lens' Queue [JobWithStat] -- ^ to queue
222 -> JobId
223 -> JQStatus
224 -> IO (Maybe JobWithStat)
225 moveJobAtomic fromQ toQ jid qstat =
226 atomicModifyIORef (jqJobs qstat) (moveJob fromQ toQ jid)
227
228 -- | Manipulate a running job by atomically moving it from 'qRunning'
229 -- into 'qManipulated', running a given IO action and then atomically
230 -- returning it back.
231 --
232 -- Returns the result of the IO action, or 'Nothing', if the job wasn't found
233 -- in the queue.
234 manipulateRunningJob :: JQStatus -> JobId -> IO a -> IO (Maybe a)
235 manipulateRunningJob qstat jid k = do
236 jobOpt <- moveJobAtomic qRunningL qManipulatedL jid qstat
237 case jobOpt of
238 Nothing -> return Nothing
239 Just _ -> (Just `liftM` k)
240 `finally` moveJobAtomic qManipulatedL qRunningL jid qstat
241
242 -- | Sort out the finished jobs from the monitored part of the queue.
243 -- This is the pure part, splitting the queue into a remaining queue
244 -- and the jobs that were removed.
245 sortoutFinishedJobs :: Queue -> (Queue, [JobWithStat])
246 sortoutFinishedJobs queue =
247 let (fin, run') = partition (jobFinalized . jJob) . qRunning $ queue
248 in (queue {qRunning=run'}, fin)
249
250 -- | Actually clean up the finished jobs. This is the IO wrapper around
251 -- the pure `sortoutFinishedJobs`.
252 cleanupFinishedJobs :: JQStatus -> IO ()
253 cleanupFinishedJobs qstate = do
254 finished <- atomicModifyIORef (jqJobs qstate) sortoutFinishedJobs
255 let showJob = show . ((fromJobId . qjId) &&& calcJobStatus) . jJob
256 jlist = commaJoin $ map showJob finished
257 unless (null finished)
258 . logInfo $ "Finished jobs: " ++ jlist
259 mapM_ (maybe (return ()) killINotify . jINotify) finished
260
261 -- | Watcher task for a job, to update it on file changes. It also
262 -- reinstantiates itself upon receiving an Ignored event.
263 jobWatcher :: JQStatus -> JobWithStat -> Event -> IO ()
264 jobWatcher state jWS e = do
265 let jid = qjId $ jJob jWS
266 jids = show $ fromJobId jid
267 logInfo $ "Scheduler notified of change of job " ++ jids
268 logDebug $ "Scheduler notify event for " ++ jids ++ ": " ++ show e
269 let inotify = jINotify jWS
270 when (e == Ignored && isJust inotify) $ do
271 qdir <- queueDir
272 let fpath = liveJobFile qdir jid
273 _ <- addWatch (fromJust inotify) [Modify, Delete] fpath
274 (jobWatcher state jWS)
275 return ()
276 updateJob state jWS
277
278 -- | Attach the job watcher to a running job.
279 attachWatcher :: JQStatus -> JobWithStat -> IO ()
280 attachWatcher state jWS = when (isNothing $ jINotify jWS) $ do
281 max_watch <- getMaxTrackedJobs state
282 rql <- getRQL state
283 if rql < max_watch
284 then do
285 inotify <- initINotify
286 qdir <- queueDir
287 let fpath = liveJobFile qdir . qjId $ jJob jWS
288 jWS' = jWS { jINotify=Just inotify }
289 logDebug $ "Attaching queue watcher for " ++ fpath
290 _ <- addWatch inotify [Modify, Delete] fpath $ jobWatcher state jWS'
291 modifyJobs state . onRunningJobs $ updateJobStatus jWS'
292 else logDebug $ "Not attaching watcher for job "
293 ++ (show . fromJobId . qjId $ jJob jWS)
294 ++ ", run queue length is " ++ show rql
295
296 -- | For a queued job, determine whether it is eligible to run, i.e.,
297 -- if no jobs it depends on are either enqueued or running.
298 jobEligible :: Queue -> JobWithStat -> Bool
299 jobEligible queue jWS =
300 let jdeps = getJobDependencies $ jJob jWS
301 blocks = flip elem jdeps . qjId . jJob
302 in not . any blocks . liftA2 (++) qRunning qEnqueued $ queue
303
304 -- | Decide on which jobs to schedule next for execution. This is the
305 -- pure function doing the scheduling.
306 selectJobsToRun :: Int -- ^ How many jobs are allowed to run at the
307 -- same time.
308 -> Set FilterRule -- ^ Filter rules to respect for scheduling
309 -> Queue
310 -> (Queue, [JobWithStat])
311 selectJobsToRun count filters queue =
312 let n = count - length (qRunning queue) - length (qManipulated queue)
313 chosen = take n
314 . jobFiltering queue filters
315 . reasonRateLimit queue
316 . sortBy (comparing (calcJobPriority . jJob))
317 . filter (jobEligible queue)
318 $ qEnqueued queue
319 remain = deleteFirstsBy ((==) `on` (qjId . jJob)) (qEnqueued queue) chosen
320 in (queue {qEnqueued=remain, qRunning=qRunning queue ++ chosen}, chosen)
321
322 -- | Logs errors of failed jobs and returns the set of job IDs.
323 logFailedJobs :: (MonadLog m)
324 => [(JobWithStat, GanetiException)] -> m (S.Set JobId)
325 logFailedJobs [] = return S.empty
326 logFailedJobs jobs = do
327 let jids = S.fromList . map (qjId . jJob . fst) $ jobs
328 jidsString = commaJoin . map (show . fromJobId) . S.toList $ jids
329 logWarning $ "Starting jobs " ++ jidsString ++ " failed: "
330 ++ show (map snd jobs)
331 return jids
332
333 -- | Fail jobs that were previously selected for execution
334 -- but couldn't be started.
335 failJobs :: ConfigData -> JQStatus -> [(JobWithStat, GanetiException)]
336 -> IO ()
337 failJobs cfg qstate jobs = do
338 qdir <- queueDir
339 now <- currentTimestamp
340 jids <- logFailedJobs jobs
341 let sjobs = intercalate "." . map (show . fromJobId) $ S.toList jids
342 let rmJobs = filter ((`S.notMember` jids) . qjId . jJob)
343 logWarning $ "Failing jobs " ++ sjobs
344 modifyJobs qstate $ onRunningJobs rmJobs
345 let trySaveJob :: JobWithStat -> ResultT String IO ()
346 trySaveJob = (() <$) . writeAndReplicateJob cfg qdir . jJob
347 reason jid msg =
348 ( "gnt:daemon:luxid:startjobs"
349 , "job " ++ show (fromJobId jid) ++ " failed to start: " ++ msg
350 , reasonTrailTimestamp now )
351 failJob err job = failQueuedJob (reason (qjId job) (show err)) now job
352 failAndSaveJobWithStat (jws, err) =
353 trySaveJob . over jJobL (failJob err) $ jws
354 mapM_ (runResultT . failAndSaveJobWithStat) jobs
355 logDebug $ "Failed jobs " ++ sjobs
356
357
358 -- | Checks if any jobs match a REJECT filter rule, and cancels them.
359 cancelRejectedJobs :: JQStatus -> ConfigData -> Set FilterRule -> IO ()
360 cancelRejectedJobs qstate cfg filters = do
361
362 enqueuedJobs <- map jJob . qEnqueued <$> readIORef (jqJobs qstate)
363
364 -- Determine which jobs are rejected.
365 let jobsToCancel =
366 [ (job, fr) | job <- enqueuedJobs
367 , Just fr <- [applyingFilter filters job]
368 , frAction fr == Reject ]
369
370 -- Cancel them.
371 qDir <- queueDir
372 forM_ jobsToCancel $ \(job, fr) -> do
373 let jid = qjId job
374 logDebug $ "Cancelling job " ++ show (fromJobId jid)
375 ++ " because it was REJECTed by filter rule " ++ uuidOf fr
376 -- First dequeue, then cancel.
377 dequeueResult <- dequeueJob qstate jid
378 case dequeueResult of
379 Ok True -> do
380 now <- currentTimestamp
381 r <- runResultT
382 $ writeAndReplicateJob cfg qDir (cancelQueuedJob now job)
383 case r of
384 Ok _ -> return ()
385 Bad err -> logError $
386 "Failed to write config when cancelling job: " ++ err
387 Ok False -> do
388 logDebug $ "Job " ++ show (fromJobId jid)
389 ++ " not queued; trying to cancel directly"
390 _ <- cancelJob False (jqLivelock qstate) jid -- sigTERM-kill only
391 return ()
392 Bad s -> logError s -- passing a nonexistent job ID is an error here
393
394
395 -- | Schedule jobs to be run. This is the IO wrapper around the
396 -- pure `selectJobsToRun`.
397 scheduleSomeJobs :: JQStatus -> IO ()
398 scheduleSomeJobs qstate = do
399 cfgR <- readIORef (jqConfig qstate)
400 case cfgR of
401 Bad err -> do
402 let msg = "Configuration unavailable: " ++ err
403 logError msg
404 Ok cfg -> do
405 let filters = S.fromList . Map.elems . fromContainer $ configFilters cfg
406
407 -- Check if jobs are rejected by a REJECT filter, and cancel them.
408 cancelRejectedJobs qstate cfg filters
409
410 -- Select the jobs to run.
411 count <- getMaxRunningJobs qstate
412 chosen <- atomicModifyIORef (jqJobs qstate)
413 (selectJobsToRun count filters)
414 let jobs = map jJob chosen
415 unless (null chosen) . logInfo . (++) "Starting jobs: " . commaJoin
416 $ map (show . fromJobId . qjId) jobs
417
418 -- Attach the watcher.
419 mapM_ (attachWatcher qstate) chosen
420
421 -- Start the jobs.
422 result <- JQ.startJobs (jqLivelock qstate) (jqForkLock qstate) jobs
423 let badWith (x, Bad y) = Just (x, y)
424 badWith _ = Nothing
425 let failed = mapMaybe badWith $ zip chosen result
426 unless (null failed) $ failJobs cfg qstate failed
427
428 -- | Format the job queue status in a compact, human readable way.
429 showQueue :: Queue -> String
430 showQueue (Queue {qEnqueued=waiting, qRunning=running}) =
431 let showids = show . map (fromJobId . qjId . jJob)
432 in "Waiting jobs: " ++ showids waiting
433 ++ "; running jobs: " ++ showids running
434
435 -- | Check if a job died, and clean up if so. Return True, if
436 -- the job was found dead.
437 checkForDeath :: JQStatus -> JobWithStat -> IO Bool
438 checkForDeath state jobWS = do
439 let job = jJob jobWS
440 jid = qjId job
441 sjid = show $ fromJobId jid
442 livelock = qjLivelock job
443 logDebug $ "Livelock of job " ++ sjid ++ " is " ++ show livelock
444 died <- maybe (return False) isDead
445 . mfilter (/= jqLivelock state)
446 $ livelock
447 logDebug $ "Death of " ++ sjid ++ ": " ++ show died
448 when died $ do
449 logInfo $ "Detected death of job " ++ sjid
450 -- if we manage to remove the job from the queue, we own the job file
451 -- and can manipulate it.
452 void . manipulateRunningJob state jid . runResultT $ do
453 jobWS' <- mkResultT $ readJobFromDisk jid :: ResultG JobWithStat
454 unless (jobFinalized . jJob $ jobWS') . void $ do
455 -- If the job isn't finalized, but dead, add a corresponding
456 -- failed status.
457 now <- liftIO currentTimestamp
458 qDir <- liftIO queueDir
459 let reason = ( "gnt:daemon:luxid:deathdetection"
460 , "detected death of job " ++ sjid
461 , reasonTrailTimestamp now )
462 failedJob = failQueuedJob reason now $ jJob jobWS'
463 cfg <- mkResultT . readIORef $ jqConfig state
464 writeAndReplicateJob cfg qDir failedJob
465 return died
466
467 -- | Trigger job detection for the job with the given job id.
468 -- Return True, if the job is dead.
469 cleanupIfDead :: JQStatus -> JobId -> IO Bool
470 cleanupIfDead state jid = do
471 logDebug $ "Extra job-death detection for " ++ show (fromJobId jid)
472 jobs <- readIORef (jqJobs state)
473 let jobWS = find ((==) jid . qjId . jJob) $ qRunning jobs
474 maybe (return True) (checkForDeath state) jobWS
475
476 -- | Time-based watcher for updating the job queue.
477 onTimeWatcher :: JQStatus -> IO ()
478 onTimeWatcher qstate = forever $ do
479 threadDelay watchInterval
480 logDebug "Job queue watcher timer fired"
481 jobs <- readIORef (jqJobs qstate)
482 mapM_ (checkForDeath qstate) $ qRunning jobs
483 jobs' <- readIORef (jqJobs qstate)
484 mapM_ (updateJob qstate) $ qRunning jobs'
485 cleanupFinishedJobs qstate
486 jobs'' <- readIORef (jqJobs qstate)
487 logInfo $ showQueue jobs''
488 scheduleSomeJobs qstate
489 logDebug "Job queue watcher cycle finished"
490
491 -- | Read a single, non-archived, job, specified by its id, from disk.
492 readJobFromDisk :: JobId -> IO (Result JobWithStat)
493 readJobFromDisk jid = do
494 qdir <- queueDir
495 let fpath = liveJobFile qdir jid
496 logDebug $ "Reading " ++ fpath
497 tryFstat <- try $ getFStat fpath :: IO (Either IOError FStat)
498 let fstat = either (const nullFStat) id tryFstat
499 loadResult <- JQ.loadJobFromDisk qdir False jid
500 return $ liftM (JobWithStat Nothing fstat . fst) loadResult
501
502 -- | Read all non-finalized jobs from disk.
503 readJobsFromDisk :: IO [JobWithStat]
504 readJobsFromDisk = do
505 logInfo "Loading job queue"
506 qdir <- queueDir
507 eitherJids <- JQ.getJobIDs [qdir]
508 let jids = genericResult (const []) JQ.sortJobIDs eitherJids
509 jidsstring = commaJoin $ map (show . fromJobId) jids
510 logInfo $ "Non-archived jobs on disk: " ++ jidsstring
511 jobs <- mapM readJobFromDisk jids
512 return $ justOk jobs
513
514 -- | Set up the job scheduler. This will also start the monitoring
515 -- of changes to the running jobs.
516 initJQScheduler :: JQStatus -> IO ()
517 initJQScheduler qstate = do
518 alljobs <- readJobsFromDisk
519 let jobs = filter (not . jobFinalized . jJob) alljobs
520 (running, queued) = partition (jobStarted . jJob) jobs
521 modifyJobs qstate (onQueuedJobs (++ queued) . onRunningJobs (++ running))
522 jqjobs <- readIORef (jqJobs qstate)
523 logInfo $ showQueue jqjobs
524 scheduleSomeJobs qstate
525 logInfo "Starting time-based job queue watcher"
526 _ <- forkIO $ onTimeWatcher qstate
527 return ()
528
529 -- | Enqueue new jobs. This will guarantee that the jobs will be executed
530 -- eventually.
531 enqueueNewJobs :: JQStatus -> [QueuedJob] -> IO ()
532 enqueueNewJobs state jobs = do
533 logInfo . (++) "New jobs enqueued: " . commaJoin
534 $ map (show . fromJobId . qjId) jobs
535 let jobs' = map unreadJob jobs
536 insertFn = insertBy (compare `on` fromJobId . qjId . jJob)
537 addJobs oldjobs = foldl (flip insertFn) oldjobs jobs'
538 modifyJobs state (onQueuedJobs addJobs)
539 scheduleSomeJobs state
540
541 -- | Pure function for removing a queued job from the job queue by
542 -- atomicModifyIORef. The answer is Just the job if the job could be removed
543 -- before being handed over to execution, Nothing if it already was started
544 -- and a Bad result if the job is not found in the queue.
545 rmJob :: JobId -> Queue -> (Queue, Result (Maybe QueuedJob))
546 rmJob jid q =
547 let isJid = (jid ==) . qjId . jJob
548 (found, queued') = partition isJid $ qEnqueued q
549 isRunning = any isJid $ qRunning q
550 sJid = (++) "Job " . show $ fromJobId jid
551 in case (found, isRunning) of
552 ([job], _) -> (q {qEnqueued = queued'}, Ok . Just $ jJob job)
553 (_:_, _) -> (q, Bad $ "Queue in inconsistent state."
554 ++ sJid ++ " queued multiple times")
555 (_, True) -> (q, Ok Nothing)
556 _ -> (q, Bad $ sJid ++ " not found in queue")
557
558 -- | Try to remove a queued job from the job queue. Return True, if
559 -- the job could be removed from the queue before being handed over
560 -- to execution, False if the job already started, and a Bad result
561 -- if the job is unknown.
562 dequeueJob :: JQStatus -> JobId -> IO (Result Bool)
563 dequeueJob state jid = do
564 result <- atomicModifyIORef (jqJobs state) $ rmJob jid
565 let result' = fmap isJust result
566 logDebug $ "Result of dequeing job " ++ show (fromJobId jid)
567 ++ " is " ++ show result'
568 return result'
569
570 -- | Change the priority of a queued job (once the job is handed over
571 -- to execution, the job itself needs to be informed). To avoid the
572 -- job being started unmodified, it is temporarily unqueued during the
573 -- change. Return the modified job, if the job's priority was sucessfully
574 -- modified, Nothing, if the job already started, and a Bad value, if the job
575 -- is unkown.
576 setJobPriority :: JQStatus -> JobId -> Int -> IO (Result (Maybe QueuedJob))
577 setJobPriority state jid prio = runResultT $ do
578 maybeJob <- mkResultT . atomicModifyIORef (jqJobs state) $ rmJob jid
579 case maybeJob of
580 Nothing -> return Nothing
581 Just job -> do
582 let job' = changeJobPriority prio job
583 qDir <- liftIO queueDir
584 mkResultT $ writeJobToDisk qDir job'
585 liftIO $ enqueueNewJobs state [job']
586 return $ Just job'
587
588
589 -- | Given old and new configs, determines if the changes between them should
590 -- trigger the scheduler to run.
591 configChangeNeedsRescheduling :: ConfigData -> ConfigData -> Bool
592 configChangeNeedsRescheduling oldConfig newConfig =
593 or -- Trigger rescheduling if:
594 [ configFilters oldConfig /= configFilters newConfig -- filters changed
595 ]