5c79843a81f0db3a75105081d805b76266c17262
[ganeti-github.git] / src / Ganeti / JQScheduler.hs
1 {-# LANGUAGE RankNTypes #-}
2 {-| Implementation of a reader for the job queue.
3
4 -}
5
6 {-
7
8 Copyright (C) 2013 Google Inc.
9 All rights reserved.
10
11 Redistribution and use in source and binary forms, with or without
12 modification, are permitted provided that the following conditions are
13 met:
14
15 1. Redistributions of source code must retain the above copyright notice,
16 this list of conditions and the following disclaimer.
17
18 2. Redistributions in binary form must reproduce the above copyright
19 notice, this list of conditions and the following disclaimer in the
20 documentation and/or other materials provided with the distribution.
21
22 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
23 IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
24 TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
25 PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
26 CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
27 EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
28 PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
29 PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
30 LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
31 NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
32 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33
34 -}
35
36 module Ganeti.JQScheduler
37 ( JQStatus
38 , jqLivelock
39 , emptyJQStatus
40 , selectJobsToRun
41 , scheduleSomeJobs
42 , initJQScheduler
43 , enqueueNewJobs
44 , dequeueJob
45 , setJobPriority
46 , cleanupIfDead
47 , updateStatusAndScheduleSomeJobs
48 , configChangeNeedsRescheduling
49 ) where
50
51 import Prelude ()
52 import Ganeti.Prelude
53
54 import Control.Applicative (liftA2)
55 import Control.Arrow
56 import Control.Concurrent
57 import Control.Exception
58 import Control.Monad ( when
59 , mfilter
60 , liftM
61 , void
62 , unless
63 , forever
64 , forM_)
65 import Control.Monad.IO.Class
66 import Data.Function (on)
67 import Data.IORef (IORef, atomicModifyIORef, newIORef, readIORef)
68 import Data.List ( find
69 , deleteFirstsBy
70 , sortBy
71 , intercalate
72 , partition
73 , insertBy)
74 import Data.Maybe
75 import qualified Data.Map as Map
76 import Data.Ord (comparing)
77 import Data.Set (Set)
78 import qualified Data.Set as S
79 import System.INotify
80
81 import Ganeti.BasicTypes
82 import Ganeti.Compat
83 import Ganeti.Constants as C
84 import Ganeti.Errors
85 import Ganeti.JQScheduler.Filtering (applyingFilter, jobFiltering)
86 import Ganeti.JQScheduler.Types
87 import Ganeti.JQScheduler.ReasonRateLimiting (reasonRateLimit)
88 import Ganeti.JQueue as JQ
89 import Ganeti.JQueue.LockDecls
90 import Ganeti.JSON (fromContainer)
91 import Ganeti.Lens hiding (chosen)
92 import Ganeti.Logging
93 import Ganeti.Objects
94 import Ganeti.OpCodes
95 import Ganeti.Path
96 import Ganeti.Query.Exec (forkPostHooksProcess)
97 import Ganeti.Types
98 import Ganeti.Utils
99 import Ganeti.Utils.Livelock
100 import Ganeti.Utils.MVarLock
101
102
103 {-| Representation of the job queue
104
105 We keep two lists of jobs (together with information about the last
106 fstat result observed): the jobs that are enqueued, but not yet handed
107 over for execution, and the jobs already handed over for execution. They
108 are kept together in a single IORef, so that we can atomically update
109 both, in particular when scheduling jobs to be handed over for execution.
110
111 -}
112
113 data JQStatus = JQStatus
114 { jqJobs :: IORef Queue
115 , jqConfig :: IORef (Result ConfigData)
116 , jqLivelock :: Livelock
117 , jqForkLock :: Lock
118 }
119
120
121 emptyJQStatus :: IORef (Result ConfigData) -> IO JQStatus
122 emptyJQStatus config = do
123 jqJ <- newIORef Queue { qEnqueued = [], qRunning = [], qManipulated = [] }
124 (_, livelock) <- mkLivelockFile C.luxiLivelockPrefix
125 forkLock <- newLock
126 return JQStatus { jqJobs = jqJ, jqConfig = config, jqLivelock = livelock
127 , jqForkLock = forkLock }
128
129 -- When updating the job lists, force the elements to WHNF, otherwise it is
130 -- easy to leak the resources held onto by the lazily parsed job file.
131 -- This can happen, eg, if updateJob is called, but the resulting QueuedJob
132 -- isn't used by the scheduler, for example when the inotify watcher or the
133 -- the polling loop re-reads a job with a new message appended to it.
134
135 -- | Apply a function on the running jobs.
136 onRunningJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> Queue
137 onRunningJobs f q@Queue { qRunning = qr } =
138 let qr' = (foldr seq () qr) `seq` f qr -- force list els to WHNF
139 in q { qRunning = qr' }
140
141 -- | Apply a function on the queued jobs.
142 onQueuedJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> Queue
143 onQueuedJobs f q@Queue { qEnqueued = qe } =
144 let qe' = (foldr seq () qe) `seq` f qe -- force list els to WHNF
145 in q { qEnqueued = qe' }
146
147 -- | Obtain a JobWithStat from a QueuedJob.
148 unreadJob :: QueuedJob -> JobWithStat
149 unreadJob job = JobWithStat {jJob=job, jStat=nullFStat, jINotify=Nothing}
150
151 -- | Read a cluster parameter from the configuration, using a default if the
152 -- configuration is not available.
153 getConfigValue :: (Cluster -> a) -> a -> JQStatus -> IO a
154 getConfigValue param defaultvalue =
155 liftM (genericResult (const defaultvalue) (param . configCluster))
156 . readIORef . jqConfig
157
158 -- | Get the maximual number of jobs to be run simultaneously from the
159 -- configuration. If the configuration is not available, be conservative
160 -- and use the smallest possible value, i.e., 1.
161 getMaxRunningJobs :: JQStatus -> IO Int
162 getMaxRunningJobs = getConfigValue clusterMaxRunningJobs 1
163
164 -- | Get the maximual number of jobs to be tracked simultaneously from the
165 -- configuration. If the configuration is not available, be conservative
166 -- and use the smallest possible value, i.e., 1.
167 getMaxTrackedJobs :: JQStatus -> IO Int
168 getMaxTrackedJobs = getConfigValue clusterMaxTrackedJobs 1
169
170 -- | Get the number of jobs currently running.
171 getRQL :: JQStatus -> IO Int
172 getRQL = liftM (length . qRunning) . readIORef . jqJobs
173
174 -- | Wrapper function to atomically update the jobs in the queue status.
175 modifyJobs :: JQStatus -> (Queue -> Queue) -> IO ()
176 modifyJobs qstat f = atomicModifyIORef' (jqJobs qstat) (flip (,) () . f)
177
178 -- | Reread a job from disk, if the file has changed.
179 readJobStatus :: JobWithStat -> IO (Maybe JobWithStat)
180 readJobStatus jWS@(JobWithStat {jStat=fstat, jJob=job}) = do
181 let jid = qjId job
182 qdir <- queueDir
183 let fpath = liveJobFile qdir jid
184 logDebug $ "Checking if " ++ fpath ++ " changed on disk."
185 changedResult <- try $ needsReload fstat fpath
186 :: IO (Either IOError (Maybe FStat))
187 let changed = either (const $ Just nullFStat) id changedResult
188 case changed of
189 Nothing -> do
190 logDebug $ "File " ++ fpath ++ " not changed on disk."
191 return Nothing
192 Just fstat' -> do
193 let jids = show $ fromJobId jid
194 logDebug $ "Rereading job " ++ jids
195 readResult <- loadJobFromDisk qdir True jid
196 case readResult of
197 Bad s -> do
198 logWarning $ "Failed to read job " ++ jids ++ ": " ++ s
199 return Nothing
200 Ok (job', _) -> do
201 logDebug $ "Read job " ++ jids ++ ", status is "
202 ++ show (calcJobStatus job')
203 return . Just $ jWS {jStat=fstat', jJob=job'}
204 -- jINotify unchanged
205
206 -- | Update a job in the job queue, if it is still there. This is the
207 -- pure function for inserting a previously read change into the queue.
208 -- as the change contains its time stamp, we don't have to worry about a
209 -- later read change overwriting a newer read state. If this happens, the
210 -- fstat value will be outdated, so the next poller run will fix this.
211 updateJobStatus :: JobWithStat -> [JobWithStat] -> [JobWithStat]
212 updateJobStatus job' =
213 let jid = qjId $ jJob job' in
214 map (\job -> if qjId (jJob job) == jid then job' else job)
215
216 -- | Update a single job by reading it from disk, if necessary.
217 updateJob :: JQStatus -> JobWithStat -> IO ()
218 updateJob state jb = do
219 jb' <- readJobStatus jb
220 maybe (return ()) (modifyJobs state . onRunningJobs . updateJobStatus) jb'
221 when (maybe True (jobFinalized . jJob) jb') . (>> return ()) . forkIO $ do
222 logDebug "Scheduler noticed a job to have finished."
223 cleanupFinishedJobs state
224 scheduleSomeJobs state
225
226 -- | Move a job from one part of the queue to another.
227 -- Return the job that was moved, or 'Nothing' if it wasn't found in
228 -- the queue.
229 moveJob :: Lens' Queue [JobWithStat] -- ^ from queue
230 -> Lens' Queue [JobWithStat] -- ^ to queue
231 -> JobId
232 -> Queue
233 -> (Queue, Maybe JobWithStat)
234 moveJob fromQ toQ jid queue =
235 -- traverse over the @(,) [JobWithStats]@ functor to extract the job
236 case traverseOf fromQ (partition ((== jid) . qjId . jJob)) queue of
237 (job : _, queue') -> (over toQ (++ [job]) queue', Just job)
238 _ -> (queue, Nothing)
239
240 -- | Atomically move a job from one part of the queue to another.
241 -- Return the job that was moved, or 'Nothing' if it wasn't found in
242 -- the queue.
243 moveJobAtomic :: Lens' Queue [JobWithStat] -- ^ from queue
244 -> Lens' Queue [JobWithStat] -- ^ to queue
245 -> JobId
246 -> JQStatus
247 -> IO (Maybe JobWithStat)
248 moveJobAtomic fromQ toQ jid qstat =
249 atomicModifyIORef (jqJobs qstat) (moveJob fromQ toQ jid)
250
251 -- | Manipulate a running job by atomically moving it from 'qRunning'
252 -- into 'qManipulated', running a given IO action and then atomically
253 -- returning it back.
254 --
255 -- Returns the result of the IO action, or 'Nothing', if the job wasn't found
256 -- in the queue.
257 manipulateRunningJob :: JQStatus -> JobId -> IO a -> IO (Maybe a)
258 manipulateRunningJob qstat jid k = do
259 jobOpt <- moveJobAtomic qRunningL qManipulatedL jid qstat
260 case jobOpt of
261 Nothing -> return Nothing
262 Just _ -> (Just `liftM` k)
263 `finally` moveJobAtomic qManipulatedL qRunningL jid qstat
264
265 -- | Sort out the finished jobs from the monitored part of the queue.
266 -- This is the pure part, splitting the queue into a remaining queue
267 -- and the jobs that were removed.
268 sortoutFinishedJobs :: Queue -> (Queue, [JobWithStat])
269 sortoutFinishedJobs queue =
270 let (fin, run') = partition (jobFinalized . jJob) . qRunning $ queue
271 in (queue {qRunning=run'}, fin)
272
273 -- | Actually clean up the finished jobs. This is the IO wrapper around
274 -- the pure `sortoutFinishedJobs`.
275 cleanupFinishedJobs :: JQStatus -> IO ()
276 cleanupFinishedJobs qstate = do
277 finished <- atomicModifyIORef (jqJobs qstate) sortoutFinishedJobs
278 let showJob = show . ((fromJobId . qjId) &&& calcJobStatus) . jJob
279 jlist = commaJoin $ map showJob finished
280 unless (null finished)
281 . logInfo $ "Finished jobs: " ++ jlist
282 mapM_ (maybe (return ()) killINotify . jINotify) finished
283
284 -- | Watcher task for a job, to update it on file changes. It also
285 -- reinstantiates itself upon receiving an Ignored event.
286 jobWatcher :: JQStatus -> JobWithStat -> Event -> IO ()
287 jobWatcher state jWS e = do
288 let jid = qjId $ jJob jWS
289 jids = show $ fromJobId jid
290 logInfo $ "Scheduler notified of change of job " ++ jids
291 logDebug $ "Scheduler notify event for " ++ jids ++ ": " ++ show e
292 let inotify = jINotify jWS
293 when (e == Ignored && isJust inotify) $ do
294 qdir <- queueDir
295 let fpath = liveJobFile qdir jid
296 _ <- addWatch (fromJust inotify) [Modify, Delete] fpath
297 (jobWatcher state jWS)
298 return ()
299 updateJob state jWS
300
301 -- | Attach the job watcher to a running job.
302 attachWatcher :: JQStatus -> JobWithStat -> IO ()
303 attachWatcher state jWS = when (isNothing $ jINotify jWS) $ do
304 max_watch <- getMaxTrackedJobs state
305 rql <- getRQL state
306 if rql < max_watch
307 then do
308 inotify <- initINotify
309 qdir <- queueDir
310 let fpath = liveJobFile qdir . qjId $ jJob jWS
311 jWS' = jWS { jINotify=Just inotify }
312 logDebug $ "Attaching queue watcher for " ++ fpath
313 _ <- addWatch inotify [Modify, Delete] fpath $ jobWatcher state jWS'
314 modifyJobs state . onRunningJobs $ updateJobStatus jWS'
315 else logDebug $ "Not attaching watcher for job "
316 ++ (show . fromJobId . qjId $ jJob jWS)
317 ++ ", run queue length is " ++ show rql
318
319 -- | For a queued job, determine whether it is eligible to run, i.e.,
320 -- if no jobs it depends on are either enqueued or running.
321 jobEligible :: Queue -> JobWithStat -> Bool
322 jobEligible queue jWS =
323 let jdeps = getJobDependencies $ jJob jWS
324 blocks = flip elem jdeps . qjId . jJob
325 in not . any blocks . liftA2 (++) qRunning qEnqueued $ queue
326
327 extractFirstOpCode :: JobWithStat -> Maybe OpCode
328 extractFirstOpCode job =
329 let qop = listToMaybe . qjOps . jJob $ job
330 metaOps = maybe [] (JQ.toMetaOpCode . qoInput) qop
331 in (fmap metaOpCode) . listToMaybe $ metaOps
332
333 -- | Sort the given job queue by its static lock weight in relation to the
334 -- currently running jobs.
335 sortByStaticLocks :: ConfigData
336 -> Queue
337 -> Timestamp -- Current time
338 -> [JobWithStat]
339 -> [JobWithStat]
340 sortByStaticLocks cfg queue currTime = sortBy (compare `on` opWeight)
341 where opWeight :: JobWithStat -> Double
342 opWeight job = adjustedWeight currTime (recvTime job)
343 . staticWeight cfg (extractFirstOpCode job) $ runningOps
344 recvTime = fromMaybe noTimestamp . qjReceivedTimestamp . jJob
345 runningOps = catMaybes . (fmap extractFirstOpCode) . qRunning $ queue
346
347 -- | Decide on which jobs to schedule next for execution. This is the
348 -- pure function doing the scheduling.
349 selectJobsToRun :: ConfigData
350 -> Int -- How many jobs are allowed to run at the same time.
351 -> Timestamp -- Current time
352 -> Set FilterRule -- Filter rules to respect for scheduling
353 -> Queue
354 -> (Queue, [JobWithStat])
355 selectJobsToRun cfg count currTime filters queue =
356 let n = count - length (qRunning queue) - length (qManipulated queue)
357 chosen = take n
358 . jobFiltering queue filters
359 . reasonRateLimit queue
360 . sortBy (comparing (calcJobPriority . jJob))
361 . filter (jobEligible queue)
362 . sortByStaticLocks cfg queue currTime
363 $ qEnqueued queue
364 remain = deleteFirstsBy ((==) `on` (qjId . jJob)) (qEnqueued queue) chosen
365 in (queue {qEnqueued=remain, qRunning=qRunning queue ++ chosen}, chosen)
366
367 -- | Logs errors of failed jobs and returns the set of job IDs.
368 logFailedJobs :: (MonadLog m)
369 => [(JobWithStat, GanetiException)] -> m (S.Set JobId)
370 logFailedJobs [] = return S.empty
371 logFailedJobs jobs = do
372 let jids = S.fromList . map (qjId . jJob . fst) $ jobs
373 jidsString = commaJoin . map (show . fromJobId) . S.toList $ jids
374 logWarning $ "Starting jobs " ++ jidsString ++ " failed: "
375 ++ show (map snd jobs)
376 return jids
377
378 -- | Fail jobs that were previously selected for execution
379 -- but couldn't be started.
380 failJobs :: ConfigData -> JQStatus -> [(JobWithStat, GanetiException)]
381 -> IO ()
382 failJobs cfg qstate jobs = do
383 qdir <- queueDir
384 now <- currentTimestamp
385 jids <- logFailedJobs jobs
386 let sjobs = intercalate "." . map (show . fromJobId) $ S.toList jids
387 let rmJobs = filter ((`S.notMember` jids) . qjId . jJob)
388 logWarning $ "Failing jobs " ++ sjobs
389 modifyJobs qstate $ onRunningJobs rmJobs
390 let trySaveJob :: JobWithStat -> ResultT String IO ()
391 trySaveJob = (() <$) . writeAndReplicateJob cfg qdir . jJob
392 reason jid msg =
393 ( "gnt:daemon:luxid:startjobs"
394 , "job " ++ show (fromJobId jid) ++ " failed to start: " ++ msg
395 , reasonTrailTimestamp now )
396 failJob err job = failQueuedJob (reason (qjId job) (show err)) now job
397 failAndSaveJobWithStat (jws, err) =
398 trySaveJob . over jJobL (failJob err) $ jws
399 mapM_ (runResultT . failAndSaveJobWithStat) jobs
400 logDebug $ "Failed jobs " ++ sjobs
401
402
403 -- | Checks if any jobs match a REJECT filter rule, and cancels them.
404 cancelRejectedJobs :: JQStatus -> ConfigData -> Set FilterRule -> IO ()
405 cancelRejectedJobs qstate cfg filters = do
406
407 enqueuedJobs <- map jJob . qEnqueued <$> readIORef (jqJobs qstate)
408
409 -- Determine which jobs are rejected.
410 let jobsToCancel =
411 [ (job, fr) | job <- enqueuedJobs
412 , Just fr <- [applyingFilter filters job]
413 , frAction fr == Reject ]
414
415 -- Cancel them.
416 qDir <- queueDir
417 forM_ jobsToCancel $ \(job, fr) -> do
418 let jid = qjId job
419 logDebug $ "Cancelling job " ++ show (fromJobId jid)
420 ++ " because it was REJECTed by filter rule " ++ uuidOf fr
421 -- First dequeue, then cancel.
422 dequeueResult <- dequeueJob qstate jid
423 case dequeueResult of
424 Ok True -> do
425 now <- currentTimestamp
426 r <- runResultT
427 $ writeAndReplicateJob cfg qDir (cancelQueuedJob now job)
428 case r of
429 Ok _ -> return ()
430 Bad err -> logError $
431 "Failed to write config when cancelling job: " ++ err
432 Ok False -> do
433 logDebug $ "Job " ++ show (fromJobId jid)
434 ++ " not queued; trying to cancel directly"
435 _ <- cancelJob False (jqLivelock qstate) jid -- sigTERM-kill only
436 return ()
437 Bad s -> logError s -- passing a nonexistent job ID is an error here
438
439
440 -- | Schedule jobs to be run. This is the IO wrapper around the
441 -- pure `selectJobsToRun`.
442 scheduleSomeJobs :: JQStatus -> IO ()
443 scheduleSomeJobs qstate = do
444 cfgR <- readIORef (jqConfig qstate)
445 case cfgR of
446 Bad err -> do
447 let msg = "Configuration unavailable: " ++ err
448 logError msg
449 Ok cfg -> do
450 let filters = S.fromList . Map.elems . fromContainer $ configFilters cfg
451
452 -- Check if jobs are rejected by a REJECT filter, and cancel them.
453 cancelRejectedJobs qstate cfg filters
454
455 ts <- currentTimestamp
456
457 -- Select the jobs to run.
458 count <- getMaxRunningJobs qstate
459 chosen <- atomicModifyIORef (jqJobs qstate)
460 (selectJobsToRun cfg count ts filters)
461 let jobs = map jJob chosen
462 unless (null chosen) . logInfo . (++) "Starting jobs: " . commaJoin
463 $ map (show . fromJobId . qjId) jobs
464
465 -- Attach the watcher.
466 mapM_ (attachWatcher qstate) chosen
467
468 -- Start the jobs.
469 result <- JQ.startJobs (jqLivelock qstate) (jqForkLock qstate) jobs
470 let badWith (x, Bad y) = Just (x, y)
471 badWith _ = Nothing
472 let failed = mapMaybe badWith $ zip chosen result
473 unless (null failed) $ failJobs cfg qstate failed
474
475 -- | Format the job queue status in a compact, human readable way.
476 showQueue :: Queue -> String
477 showQueue (Queue {qEnqueued=waiting, qRunning=running}) =
478 let showids = show . map (fromJobId . qjId . jJob)
479 in "Waiting jobs: " ++ showids waiting
480 ++ "; running jobs: " ++ showids running
481
482 -- | Check if a job died, and clean up if so. Return True, if
483 -- the job was found dead.
484 checkForDeath :: JQStatus -> JobWithStat -> IO Bool
485 checkForDeath state jobWS = do
486 let job = jJob jobWS
487 jid = qjId job
488 sjid = show $ fromJobId jid
489 livelock = qjLivelock job
490 logDebug $ "Livelock of job " ++ sjid ++ " is " ++ show livelock
491 died <- maybe (return False) isDead
492 . mfilter (/= jqLivelock state)
493 $ livelock
494 logDebug $ "Death of " ++ sjid ++ ": " ++ show died
495 when died $ do
496 logInfo $ "Detected death of job " ++ sjid
497 -- if we manage to remove the job from the queue, we own the job file
498 -- and can manipulate it.
499 void . manipulateRunningJob state jid . runResultT $ do
500 jobWS' <- mkResultT $ readJobFromDisk jid :: ResultG JobWithStat
501 unless (jobFinalized . jJob $ jobWS') . void $ do
502 -- If the job isn't finalized, but dead, add a corresponding
503 -- failed status.
504 now <- liftIO currentTimestamp
505 qDir <- liftIO queueDir
506 let reason = ( "gnt:daemon:luxid:deathdetection"
507 , "detected death of job " ++ sjid
508 , reasonTrailTimestamp now )
509 failedJob = failQueuedJob reason now $ jJob jobWS'
510 cfg <- mkResultT . readIORef $ jqConfig state
511 writeAndReplicateJob cfg qDir failedJob
512 return died
513
514 -- | Trigger job detection for the job with the given job id.
515 -- If the job is dead, start post hooks execution process and return True
516 cleanupIfDead :: JQStatus -> JobId -> IO Bool
517 cleanupIfDead state jid = do
518 logDebug $ "Extra job-death detection for " ++ show (fromJobId jid)
519 jobs <- readIORef (jqJobs state)
520 let jobWS = find ((==) jid . qjId . jJob) $ qRunning jobs
521 -- and run the post hooks
522 let runHooks = do
523 r <- runResultT . withLock (jqForkLock state)
524 $ forkPostHooksProcess jid
525 let sjid = show $ fromJobId jid
526 logDebug $ genericResult ((++) $ "Error starting post hooks process "
527 ++ "for disappeared job "
528 ++ sjid ++ ":")
529 (\pid -> "Post hooks for disappeared job "
530 ++ sjid ++ "have started in "
531 ++ show pid)
532 r
533 dead <- maybe (return True) (checkForDeath state) jobWS
534 if dead
535 then runHooks
536 else pure ()
537 return dead
538
539 -- | Force the queue to check the state of all jobs.
540 updateStatusAndScheduleSomeJobs :: JQStatus -> IO ()
541 updateStatusAndScheduleSomeJobs qstate = do
542 jobs <- readIORef (jqJobs qstate)
543 mapM_ (checkForDeath qstate) $ qRunning jobs
544 jobs' <- readIORef (jqJobs qstate)
545 mapM_ (updateJob qstate) $ qRunning jobs'
546 cleanupFinishedJobs qstate
547 jobs'' <- readIORef (jqJobs qstate)
548 logInfo $ showQueue jobs''
549 scheduleSomeJobs qstate
550
551 -- | Time-based watcher for updating the job queue.
552 onTimeWatcher :: JQStatus -> IO ()
553 onTimeWatcher qstate = forever $ do
554 threadDelaySeconds C.luxidJobqueuePollInterval
555 logDebug "Job queue watcher timer fired"
556 updateStatusAndScheduleSomeJobs qstate
557 logDebug "Job queue watcher cycle finished"
558
559 -- | Read a single, non-archived, job, specified by its id, from disk.
560 readJobFromDisk :: JobId -> IO (Result JobWithStat)
561 readJobFromDisk jid = do
562 qdir <- queueDir
563 let fpath = liveJobFile qdir jid
564 logDebug $ "Reading " ++ fpath
565 tryFstat <- try $ getFStat fpath :: IO (Either IOError FStat)
566 let fstat = either (const nullFStat) id tryFstat
567 loadResult <- JQ.loadJobFromDisk qdir False jid
568 return $ liftM (JobWithStat Nothing fstat . fst) loadResult
569
570 -- | Read all non-finalized jobs from disk.
571 readJobsFromDisk :: IO [JobWithStat]
572 readJobsFromDisk = do
573 logInfo "Loading job queue"
574 qdir <- queueDir
575 eitherJids <- JQ.getJobIDs [qdir]
576 let jids = genericResult (const []) JQ.sortJobIDs eitherJids
577 jidsstring = commaJoin $ map (show . fromJobId) jids
578 logInfo $ "Non-archived jobs on disk: " ++ jidsstring
579 jobs <- mapM readJobFromDisk jids
580 return $ justOk jobs
581
582 -- | Set up the job scheduler. This will also start the monitoring
583 -- of changes to the running jobs.
584 initJQScheduler :: JQStatus -> IO ()
585 initJQScheduler qstate = do
586 alljobs <- readJobsFromDisk
587 let jobs = filter (not . jobFinalized . jJob) alljobs
588 (running, queued) = partition (jobStarted . jJob) jobs
589 modifyJobs qstate (onQueuedJobs (++ queued) . onRunningJobs (++ running))
590 jqjobs <- readIORef (jqJobs qstate)
591 logInfo $ showQueue jqjobs
592 scheduleSomeJobs qstate
593 logInfo "Starting time-based job queue watcher"
594 _ <- forkIO $ onTimeWatcher qstate
595 return ()
596
597 -- | Enqueue new jobs. This will guarantee that the jobs will be executed
598 -- eventually.
599 enqueueNewJobs :: JQStatus -> [QueuedJob] -> IO ()
600 enqueueNewJobs state jobs = do
601 logInfo . (++) "New jobs enqueued: " . commaJoin
602 $ map (show . fromJobId . qjId) jobs
603 let jobs' = map unreadJob jobs
604 insertFn = insertBy (compare `on` fromJobId . qjId . jJob)
605 addJobs oldjobs = foldl (flip insertFn) oldjobs jobs'
606 modifyJobs state (onQueuedJobs addJobs)
607 scheduleSomeJobs state
608
609 -- | Pure function for removing a queued job from the job queue by
610 -- atomicModifyIORef. The answer is Just the job if the job could be removed
611 -- before being handed over to execution, Nothing if it already was started
612 -- and a Bad result if the job is not found in the queue.
613 rmJob :: JobId -> Queue -> (Queue, Result (Maybe QueuedJob))
614 rmJob jid q =
615 let isJid = (jid ==) . qjId . jJob
616 (found, queued') = partition isJid $ qEnqueued q
617 isRunning = any isJid $ qRunning q
618 sJid = (++) "Job " . show $ fromJobId jid
619 in case (found, isRunning) of
620 ([job], _) -> (q {qEnqueued = queued'}, Ok . Just $ jJob job)
621 (_:_, _) -> (q, Bad $ "Queue in inconsistent state."
622 ++ sJid ++ " queued multiple times")
623 (_, True) -> (q, Ok Nothing)
624 _ -> (q, Bad $ sJid ++ " not found in queue")
625
626 -- | Try to remove a queued job from the job queue. Return True, if
627 -- the job could be removed from the queue before being handed over
628 -- to execution, False if the job already started, and a Bad result
629 -- if the job is unknown.
630 dequeueJob :: JQStatus -> JobId -> IO (Result Bool)
631 dequeueJob state jid = do
632 result <- atomicModifyIORef (jqJobs state) $ rmJob jid
633 let result' = fmap isJust result
634 logDebug $ "Result of dequeing job " ++ show (fromJobId jid)
635 ++ " is " ++ show result'
636 return result'
637
638 -- | Change the priority of a queued job (once the job is handed over
639 -- to execution, the job itself needs to be informed). To avoid the
640 -- job being started unmodified, it is temporarily unqueued during the
641 -- change. Return the modified job, if the job's priority was sucessfully
642 -- modified, Nothing, if the job already started, and a Bad value, if the job
643 -- is unkown.
644 setJobPriority :: JQStatus -> JobId -> Int -> IO (Result (Maybe QueuedJob))
645 setJobPriority state jid prio = runResultT $ do
646 maybeJob <- mkResultT . atomicModifyIORef (jqJobs state) $ rmJob jid
647 case maybeJob of
648 Nothing -> return Nothing
649 Just job -> do
650 let job' = changeJobPriority prio job
651 qDir <- liftIO queueDir
652 mkResultT $ writeJobToDisk qDir job'
653 liftIO $ enqueueNewJobs state [job']
654 return $ Just job'
655
656
657 -- | Given old and new configs, determines if the changes between them should
658 -- trigger the scheduler to run.
659 configChangeNeedsRescheduling :: ConfigData -> ConfigData -> Bool
660 configChangeNeedsRescheduling old new =
661 -- Trigger rescheduling if any of the following change:
662 (((/=) `on` configFilters) old new || -- filters
663 ((/=) `on` clusterMaxRunningJobs . configCluster) old new -- run queue length
664 )