Implement predictive queue cluster parameter
authorFederico Morg Pareschi <morg@google.com>
Mon, 9 Jan 2017 11:17:58 +0000 (11:17 +0000)
committerFederico Morg Pareschi <morg@google.com>
Mon, 9 Jan 2017 12:06:12 +0000 (12:06 +0000)
This commit adds the enabled_predictive_queue cluster parameter that
allows the cluster to specify if the predictive scheduler should be used
or not when ordering ganeti jobs in the queue. It also adds the correct
command line flags for the cluster init and cluster modify ganeti jobs.

As a default, if no option is specified upon cluster creation, the
predictive scheduler is automatically enabled.

Signed-off-by: Federico Morg Pareschi <morg@google.com>
Reviewed-by: Viktor Bachraty <vbachraty@google.com>

12 files changed:
lib/bootstrap.py
lib/cli_opts.py
lib/client/gnt_cluster.py
lib/cmdlib/cluster/__init__.py
lib/objects.py
man/gnt-cluster.rst
src/Ganeti/JQScheduler.hs
src/Ganeti/Objects.hs
src/Ganeti/OpCodes.hs
src/Ganeti/OpParams.hs
src/Ganeti/Query/Server.hs
test/hs/Test/Ganeti/OpCodes.hs

index 2f98fdd..c4d9871 100644 (file)
@@ -501,7 +501,7 @@ def InitCluster(cluster_name, mac_prefix, # pylint: disable=R0913, R0914
                 use_external_mip_script=False, hv_state=None, disk_state=None,
                 enabled_disk_templates=None, install_image=None,
                 zeroing_image=None, compression_tools=None,
-                enabled_user_shutdown=False):
+                enabled_user_shutdown=False, enabled_predictive_queue=True):
   """Initialise the cluster.
 
   @type candidate_pool_size: int
@@ -805,6 +805,7 @@ def InitCluster(cluster_name, mac_prefix, # pylint: disable=R0913, R0914
     enabled_user_shutdown=enabled_user_shutdown,
     ssh_key_type=ssh_key_type,
     ssh_key_bits=ssh_key_bits,
+    enabled_predictive_queue=enabled_predictive_queue,
     )
   master_node_config = objects.Node(name=hostname.name,
                                     primary_ip=hostname.ip,
index f90594a..984536a 100644 (file)
@@ -88,6 +88,7 @@ __all__ = [
   "DIAGNOSE_DATA_COLLECTOR_FILENAME_OPT",
   "ENABLED_DISK_TEMPLATES_OPT",
   "ENABLED_HV_OPT",
+  "ENABLED_PREDICTIVE_QUEUE_OPT",
   "ENABLED_USER_SHUTDOWN_OPT",
   "ERROR_CODES_OPT",
   "EXT_PARAMS_OPT",
@@ -1118,6 +1119,13 @@ ENABLED_DISK_TEMPLATES_OPT = cli_option("--enabled-disk-templates",
                                              "disk templates",
                                         type="string", default=None)
 
+ENABLED_PREDICTIVE_QUEUE_OPT = cli_option("--predictive-queue",
+                                          default=None,
+                                          dest="enabled_predictive_queue",
+                                          help="Whether the predictive queue is"
+                                               "enabled",
+                                          type="bool")
+
 ENABLED_USER_SHUTDOWN_OPT = cli_option("--user-shutdown",
                                        default=None,
                                        dest="enabled_user_shutdown",
index cbb3af1..15208ca 100644 (file)
@@ -309,6 +309,11 @@ def InitCluster(opts, args):
 
   enabled_user_shutdown = bool(opts.enabled_user_shutdown)
 
+  if opts.enabled_predictive_queue  is not None:
+    enabled_predictive_queue = bool(opts.enabled_predictive_queue)
+  else:
+    enabled_predictive_queue = True # Predictive queue is enabled by default.
+
   if opts.ssh_key_type:
     ssh_key_type = opts.ssh_key_type
   else:
@@ -353,6 +358,7 @@ def InitCluster(opts, args):
                         enabled_user_shutdown=enabled_user_shutdown,
                         ssh_key_type=ssh_key_type,
                         ssh_key_bits=ssh_key_bits,
+                        enabled_predictive_queue=enabled_predictive_queue,
                         )
   op = opcodes.OpClusterPostInit()
   SubmitOpCode(op, opts=opts)
@@ -635,6 +641,7 @@ def ShowClusterConfig(opts, args):
       ("modify ssh setup", result["modify_ssh_setup"]),
       ("ssh_key_type", result["ssh_key_type"]),
       ("ssh_key_bits", result["ssh_key_bits"]),
+      ("enabled predictive queue", result["enabled_predictive_queue"])
       ]),
 
     ("Default node parameters",
@@ -1416,7 +1423,8 @@ def SetClusterParams(opts, args):
           opts.maint_balance_threshold is not None or
           opts.data_collector_interval or
           opts.diagnose_data_collector_filename is not None or
-          opts.enabled_data_collectors):
+          opts.enabled_data_collectors or
+          opts.enabled_predictive_queue is not None):
     ToStderr("Please give at least one of the parameters.")
     return 1
 
@@ -1567,7 +1575,8 @@ def SetClusterParams(opts, args):
     maint_balance_threshold=opts.maint_balance_threshold,
     enabled_data_collectors=enabled_data_collectors,
     data_collector_interval=data_collector_interval,
-    diagnose_data_collector_filename=opts.diagnose_data_collector_filename
+    diagnose_data_collector_filename=opts.diagnose_data_collector_filename,
+    enabled_predictive_queue=opts.enabled_predictive_queue
     )
   return base.GetResult(None, opts, SubmitOrSend(op, opts))
 
@@ -2506,6 +2515,7 @@ commands = {
      IPOLICY_STD_SPECS_OPT, GLOBAL_GLUSTER_FILEDIR_OPT, INSTALL_IMAGE_OPT,
      ZEROING_IMAGE_OPT, COMPRESSION_TOOLS_OPT,
      ENABLED_USER_SHUTDOWN_OPT, SSH_KEY_BITS_OPT, SSH_KEY_TYPE_OPT,
+     ENABLED_PREDICTIVE_QUEUE_OPT,
      ]
      + INSTANCE_POLICY_OPTS + SPLIT_ISPECS_OPTS,
     "[opts...] <cluster_name>", "Initialises a new cluster configuration"),
@@ -2591,7 +2601,8 @@ commands = {
      PREALLOC_WIPE_DISKS_OPT, NODE_PARAMS_OPT, USE_EXTERNAL_MIP_SCRIPT,
      DISK_PARAMS_OPT, HV_STATE_OPT, DISK_STATE_OPT] + SUBMIT_OPTS +
      [ENABLED_DISK_TEMPLATES_OPT, IPOLICY_STD_SPECS_OPT, MODIFY_ETCHOSTS_OPT,
-      MODIFY_SSH_SETUP_OPT, ENABLED_USER_SHUTDOWN_OPT] +
+      MODIFY_SSH_SETUP_OPT, ENABLED_USER_SHUTDOWN_OPT,
+      ENABLED_PREDICTIVE_QUEUE_OPT] +
      INSTANCE_POLICY_OPTS +
      [GLOBAL_FILEDIR_OPT, GLOBAL_SHARED_FILEDIR_OPT, ZEROING_IMAGE_OPT,
       COMPRESSION_TOOLS_OPT] +
index 28370d9..455c1de 100644 (file)
@@ -1770,6 +1770,9 @@ class LUClusterSetParams(LogicalUnit):
       self.cluster.enabled_user_shutdown = self.op.enabled_user_shutdown
       ensure_kvmd = True
 
+    if self.op.enabled_predictive_queue is not None:
+      self.cluster.enabled_predictive_queue = self.op.enabled_predictive_queue
+
     def helper_os(aname, mods, desc):
       desc += " OS list"
       lst = getattr(self.cluster, aname)
index 7e20fc2..df0494c 100644 (file)
@@ -1710,6 +1710,7 @@ class Cluster(TaggableObject):
     "diagnose_data_collector_filename",
     "ssh_key_type",
     "ssh_key_bits",
+    "enabled_predictive_queue",
     ] + _TIMESTAMPS + _UUID
 
   def UpgradeConfig(self):
index 0469f75..261019d 100644 (file)
@@ -209,6 +209,7 @@ INIT
 | [\--user-shutdown {yes \| no}]
 | [\--ssh-key-type *type*]
 | [\--ssh-key-bits *bits*]
+| [\--predictive-queue {yes \| no}]
 | {*clustername*}
 
 This commands is only run once initially on the first node of the
@@ -651,6 +652,10 @@ options **ssh-keygen**\(1) exposes. These are currently:
 
 Ganeti defaults to using 2048-bit RSA keys.
 
+The ``--predictive-queue`` option enables or disables the predictive
+queue algorithm for the job scheduler. If this option is not specified,
+Ganeti defaults to enabling the predictive scheduler.
+
 MASTER-FAILOVER
 ~~~~~~~~~~~~~~~
 
@@ -751,6 +756,8 @@ MODIFY
 | [\--auto-balance-cluster {yes \| no }]
 | [\--auto-balance-threshold *score* ]
 | [\--diagnose-data-collector-filename *filename*]
+| [\--predictive-queue {yes \| no}]
+
 
 
 Modify the options for the cluster.
@@ -840,6 +847,9 @@ in absolute terms, unless the cluster score it at least 10 times that
 value, in which case all beneficial steps will be done if auto-balancing
 is enabled.
 
+The ``--predictive-queue`` option enables or disables the predictive
+queue algorithm for the job scheduler.
+
 See **gnt-cluster init** for a description of ``--install-image`` and
 ``--zeroing-image``.
 
index 5c79843..bfa7874 100644 (file)
@@ -167,6 +167,12 @@ getMaxRunningJobs = getConfigValue clusterMaxRunningJobs 1
 getMaxTrackedJobs :: JQStatus -> IO Int
 getMaxTrackedJobs = getConfigValue clusterMaxTrackedJobs 1
 
+-- | Get the boolean that specifies whether or not the predictive queue
+-- scheduler is enabled in the cluster. If the configuration is not available,
+-- the predictive queue is enabled by default.
+getEnabledPredictiveQueue :: JQStatus -> IO Bool
+getEnabledPredictiveQueue = getConfigValue clusterEnabledPredictiveQueue True
+
 -- | Get the number of jobs currently running.
 getRQL :: JQStatus -> IO Int
 getRQL = liftM (length . qRunning) . readIORef . jqJobs
@@ -348,18 +354,22 @@ sortByStaticLocks cfg queue currTime = sortBy (compare `on` opWeight)
 -- pure function doing the scheduling.
 selectJobsToRun :: ConfigData
                 -> Int -- How many jobs are allowed to run at the same time.
+                -> Bool -- If the predictive scheduler is enabled
                 -> Timestamp -- Current time
                 -> Set FilterRule -- Filter rules to respect for scheduling
                 -> Queue
                 -> (Queue, [JobWithStat])
-selectJobsToRun cfg count currTime filters queue =
+selectJobsToRun cfg count isPredictive currTime filters queue =
   let n = count - length (qRunning queue) - length (qManipulated queue)
+      pickScheduler = if isPredictive
+                         then sortByStaticLocks cfg queue currTime
+                         else id
       chosen = take n
                . jobFiltering queue filters
                . reasonRateLimit queue
                . sortBy (comparing (calcJobPriority . jJob))
                . filter (jobEligible queue)
-               . sortByStaticLocks cfg queue currTime
+               . pickScheduler
                $ qEnqueued queue
       remain = deleteFirstsBy ((==) `on` (qjId . jJob)) (qEnqueued queue) chosen
   in (queue {qEnqueued=remain, qRunning=qRunning queue ++ chosen}, chosen)
@@ -456,8 +466,10 @@ scheduleSomeJobs qstate = do
 
       -- Select the jobs to run.
       count <- getMaxRunningJobs qstate
-      chosen <- atomicModifyIORef (jqJobs qstate)
-                                  (selectJobsToRun cfg count ts filters)
+      isPredictive <- getEnabledPredictiveQueue qstate
+      let jobsToRun = selectJobsToRun cfg count isPredictive ts filters
+      chosen <- atomicModifyIORef (jqJobs qstate) jobsToRun
+
       let jobs = map jJob chosen
       unless (null chosen) . logInfo . (++) "Starting jobs: " . commaJoin
         $ map (show . fromJobId . qjId) jobs
index 5be8adf..572dc66 100644 (file)
@@ -698,6 +698,7 @@ $(buildObject "Cluster" "cluster" $
       "diagnose_data_collector_filename"         [t| String                  |]
   , simpleField "ssh_key_type"                   [t| SshKeyType              |]
   , simpleField "ssh_key_bits"                   [t| Int                     |]
+  , simpleField "enabled_predictive_queue"       [t| Bool                    |]
  ]
  ++ timeStampFields
  ++ uuidFields
index c4da480..811d59d 100644 (file)
@@ -271,6 +271,7 @@ $(genOpCode "OpCode"
      , pMaintdRoundDelay
      , pMaintdEnableBalancing
      , pMaintdBalancingThreshold
+     , pEnabledPredictiveQueue
      ],
      [])
   , ("OpClusterRedistConf",
index b5a5619..f35ae45 100644 (file)
@@ -321,6 +321,7 @@ module Ganeti.OpParams
   , pVerifyClutter
   , pLongSleep
   , pIsStrict
+  , pEnabledPredictiveQueue
   ) where
 
 import Control.Monad (liftM, mplus)
@@ -2030,3 +2031,9 @@ pIsStrict =
   withDoc "Whether the operation is in strict mode or not." .
   defaultField [| True |] $
   simpleField "is_strict" [t| Bool |]
+
+pEnabledPredictiveQueue :: Field
+pEnabledPredictiveQueue =
+  withDoc "Whether the predictive queue is enabled in the cluster." .
+  optionalField $
+  simpleField "enabled_predictive_queue" [t| Bool |]
index aefe129..8cef6cc 100644 (file)
@@ -289,6 +289,8 @@ handleCall _ _ cdata QueryClusterInfo =
                showJSON $ clusterModifySshSetup cluster)
             , ("ssh_key_type", showJSON $ clusterSshKeyType cluster)
             , ("ssh_key_bits", showJSON $ clusterSshKeyBits cluster)
+            , ("enabled_predictive_queue",
+               showJSON $ clusterEnabledPredictiveQueue cluster)
             ]
 
   in case master of
index 48a4683..7d39b66 100644 (file)
@@ -265,6 +265,7 @@ genOpCodeFromId op_id cfg =
         <*> genMaybe (fromPositive <$> arbitrary) -- maintd round interval
         <*> genMaybe arbitrary           -- enable maintd balancing
         <*> genMaybe arbitrary           -- maintd balancing threshold
+        <*> arbitrary                    -- enabled_predictive_queue
     "OP_CLUSTER_REDIST_CONF" -> pure OpCodes.OpClusterRedistConf
     "OP_CLUSTER_ACTIVATE_MASTER_IP" ->
       pure OpCodes.OpClusterActivateMasterIp