Implement starvation-prevention mechanism in queue
authorFederico Morg Pareschi <morg@google.com>
Fri, 16 Dec 2016 13:51:41 +0000 (13:51 +0000)
committerFederico Morg Pareschi <morg@google.com>
Fri, 16 Dec 2016 15:12:52 +0000 (15:12 +0000)
This patch adds a starvation prevention mechanism to the Ganeti
predictive job queue. It calculates the age of submitted jobs
comparing the current time to the submission time and appropriately
adjusts the job's lock weight accordingly to avoid starvation.

Signed-off-by: Federico Morg Pareschi <morg@google.com>
Reviewed-by: Brian Foley <bpfoley@google.com>

src/Ganeti/Constants.hs
src/Ganeti/JQScheduler.hs
src/Ganeti/JQueue/LockDecls.hs

index 1e0b877..bee8af3 100644 (file)
@@ -5634,3 +5634,14 @@ staticLockMaybeBlockWeight = 1.5
 -- | Weight assigned to two locks that will surely conflict.
 staticLockSureBlockWeight :: Double
 staticLockSureBlockWeight = 3
+
+-- | How many seconds make up a "tick" in the job queue starvation prevention
+-- system.
+jobQueueTickInSeconds :: Double
+jobQueueTickInSeconds = 30
+
+-- | Job queue starvation prevention coefficient. This means how many "ticks"
+-- of time need to pass before a job has 100% certainty to be in front of the
+-- queue.
+jobQueueStarvationCoeff :: Double
+jobQueueStarvationCoeff = 30
index ad54e23..5c79843 100644 (file)
@@ -332,28 +332,34 @@ extractFirstOpCode job =
 
 -- | Sort the given job queue by its static lock weight in relation to the
 -- currently running jobs.
-sortByStaticLocks :: ConfigData -> Queue -> [JobWithStat] -> [JobWithStat]
-sortByStaticLocks cfg queue = sortBy (compare `on` opWeight)
+sortByStaticLocks :: ConfigData
+                  -> Queue
+                  -> Timestamp -- Current time
+                  -> [JobWithStat]
+                  -> [JobWithStat]
+sortByStaticLocks cfg queue currTime = sortBy (compare `on` opWeight)
   where opWeight :: JobWithStat -> Double
-        opWeight job = staticWeight cfg (extractFirstOpCode job) runningOps
+        opWeight job = adjustedWeight currTime (recvTime job)
+                       . staticWeight cfg (extractFirstOpCode job) $ runningOps
+        recvTime = fromMaybe noTimestamp . qjReceivedTimestamp . jJob
         runningOps = catMaybes . (fmap extractFirstOpCode) . qRunning $ queue
 
 -- | Decide on which jobs to schedule next for execution. This is the
 -- pure function doing the scheduling.
 selectJobsToRun :: ConfigData
-                -> Int  -- ^ How many jobs are allowed to run at the
-                        -- same time.
-                -> Set FilterRule -- ^ Filter rules to respect for scheduling
+                -> Int -- How many jobs are allowed to run at the same time.
+                -> Timestamp -- Current time
+                -> Set FilterRule -- Filter rules to respect for scheduling
                 -> Queue
                 -> (Queue, [JobWithStat])
-selectJobsToRun cfg count filters queue =
+selectJobsToRun cfg count currTime filters queue =
   let n = count - length (qRunning queue) - length (qManipulated queue)
       chosen = take n
                . jobFiltering queue filters
                . reasonRateLimit queue
                . sortBy (comparing (calcJobPriority . jJob))
                . filter (jobEligible queue)
-               . sortByStaticLocks cfg queue
+               . sortByStaticLocks cfg queue currTime
                $ qEnqueued queue
       remain = deleteFirstsBy ((==) `on` (qjId . jJob)) (qEnqueued queue) chosen
   in (queue {qEnqueued=remain, qRunning=qRunning queue ++ chosen}, chosen)
@@ -446,10 +452,12 @@ scheduleSomeJobs qstate = do
       -- Check if jobs are rejected by a REJECT filter, and cancel them.
       cancelRejectedJobs qstate cfg filters
 
+      ts <- currentTimestamp
+
       -- Select the jobs to run.
       count <- getMaxRunningJobs qstate
       chosen <- atomicModifyIORef (jqJobs qstate)
-                                  (selectJobsToRun cfg count filters)
+                                  (selectJobsToRun cfg count ts filters)
       let jobs = map jJob chosen
       unless (null chosen) . logInfo . (++) "Starting jobs: " . commaJoin
         $ map (show . fromJobId . qjId) jobs
index 4a26cb0..1b1f585 100644 (file)
@@ -33,7 +33,9 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 -}
 
 module Ganeti.JQueue.LockDecls
-    (staticWeight) where
+    ( staticWeight
+    , adjustedWeight
+    ) where
 
 import Data.List (foldl')
 import Data.Maybe (isNothing, fromMaybe, catMaybes, isJust, fromJust)
@@ -48,6 +50,7 @@ import qualified Ganeti.Constants as C
 import Ganeti.Errors
 import Ganeti.Objects (ConfigData, nodeName, instName, groupName)
 import qualified Ganeti.Objects.Instance as ObjInst
+import qualified Ganeti.JQueue.Objects as JQ
 import Ganeti.OpCodes
 import Ganeti.Types
 
@@ -589,3 +592,12 @@ staticWeight cfg op runningOps
         maxValue = C.staticLockSureBlockWeight * 5 -- Worst case scenario
                                                    -- multiplied by all 5 lock
                                                    -- levels
+
+-- | Adjust any static weight to its appropriate anti-starvation value. It
+-- takes the current time and the time since the job was queued as parameters.
+adjustedWeight :: JQ.Timestamp -> JQ.Timestamp -> Double -> Double
+adjustedWeight (currTime, _) (recvTime, _) weight =
+  let timeVal = max 0 (1 - ticks / C.jobQueueStarvationCoeff)
+      ticks = timeDiff / C.jobQueueTickInSeconds
+      timeDiff = fromIntegral $ currTime - recvTime
+  in weight * timeVal