Make config distribution parametric in target group
authorKlaus Aehlig <aehlig@google.com>
Mon, 8 Feb 2016 15:00:45 +0000 (16:00 +0100)
committerKlaus Aehlig <aehlig@google.com>
Wed, 10 Feb 2016 09:59:03 +0000 (10:59 +0100)
Sometimes, it is sufficient to force distribution of the configuration
only to a single target group. Hence make the infrastructure parametric
in the groups the configuration should be distributed to.

Signed-off-by: Klaus Aehlig <aehlig@google.com>
Reviewed-by: Brian Foley <bpfoley@google.com>

src/Ganeti/WConfd/ConfigWriter.hs
src/Ganeti/WConfd/Core.hs
src/Ganeti/WConfd/Monad.hs

index f3dd8dd..8ffbc13 100644 (file)
@@ -49,6 +49,7 @@ import Control.Monad.Error
 import qualified Control.Monad.State.Strict as S
 import Control.Monad.Trans.Control
 import Data.Monoid
+import qualified Data.Set as Set
 
 import Ganeti.BasicTypes
 import Ganeti.Errors
@@ -64,6 +65,12 @@ import Ganeti.WConfd.ConfigState
 import Ganeti.WConfd.Monad
 import Ganeti.WConfd.Ssconf
 
+-- | From a distribution target get a predicate on nodes whether it
+-- should be distributed to this node.
+targetToPredicate :: DistributionTarget -> Node -> Bool
+targetToPredicate Everywhere = const True
+targetToPredicate (ToGroups gs) = (`Set.member` gs) . nodeGroup
+
 -- | Loads the configuration from the file, if it hasn't been loaded yet.
 -- The function is internal and isn't thread safe.
 loadConfigFromFile :: FilePath
@@ -147,19 +154,19 @@ mkStatefulAsyncTask logPrio logPrefix start action =
 saveConfigAsyncTask :: FilePath -- ^ Path to the config file
                     -> FStat  -- ^ The initial state of the config. file
                     -> IO ConfigState -- ^ An action to read the current config
-                    -> [AsyncWorker () ()] -- ^ Workers to be triggered
-                                           -- afterwards
-                    -> ResultG (AsyncWorker Any ())
+                    -> [AsyncWorker DistributionTarget ()]
+                    -- ^ Workers to be triggered afterwards
+                    -> ResultG (AsyncWorker (Any, DistributionTarget) ())
 saveConfigAsyncTask fpath fstat cdRef workers =
   lift . mkStatefulAsyncTask
            EMERGENCY "Can't write the master configuration file" fstat
-       $ \oldstat (Any flush) -> do
+       $ \oldstat (Any flush, target) -> do
             cd <- liftBase (csConfigData `liftM` cdRef)
             writeConfigToFile cd fpath oldstat
               <* if flush then logDebug "Running distribution synchronously"
-                               >> triggerAndWaitMany_ workers
+                               >> triggerAndWaitMany target workers
                           else logDebug "Running distribution asynchronously"
-                               >> mapM trigger_ workers
+                               >> mapM (trigger target) workers
 
 
 -- | Performs a RPC call on the given list of nodes and logs any failures.
@@ -175,16 +182,17 @@ execRpcCallAndLog nodes req = do
 distMCsAsyncTask :: RuntimeEnts
                  -> FilePath -- ^ Path to the config file
                  -> IO ConfigState -- ^ An action to read the current config
-                 -> ResultG (AsyncWorker () ())
+                 -> ResultG (AsyncWorker DistributionTarget ())
 distMCsAsyncTask ents cpath cdRef =
   lift . mkStatelessAsyncTask ERROR "Can't distribute the configuration\
                                     \ to master candidates"
-       $ \_ -> do
+       $ \target -> do
           cd <- liftBase (csConfigData <$> cdRef) :: ResultG ConfigData
           logDebug $ "Distributing the configuration to master candidates,\
-                     \ serial no " ++ show (serialOf cd)
+                     \ serial no " ++ show (serialOf cd) ++ ", " ++ show target
           fupload <- prepareRpcCallUploadFile ents cpath
-          execRpcCallAndLog (getMasterCandidates cd) fupload
+          execRpcCallAndLog
+            (filter (targetToPredicate target) $ getMasterCandidates cd) fupload
           logDebug "Successfully finished distributing the configuration"
 
 -- | Construct an asynchronous worker whose action is to construct SSConf
@@ -194,10 +202,10 @@ distMCsAsyncTask ents cpath cdRef =
 -- if different, distributes it.
 distSSConfAsyncTask
     :: IO ConfigState -- ^ An action to read the current config
-    -> ResultG (AsyncWorker () ())
+    -> ResultG (AsyncWorker DistributionTarget ())
 distSSConfAsyncTask cdRef =
   lift . mkStatefulAsyncTask ERROR "Can't distribute Ssconf" emptySSConf
-       $ \oldssc _ -> do
+       $ \oldssc target -> do
             cd <- liftBase (csConfigData <$> cdRef) :: ResultG ConfigData
             let ssc = mkSSConf cd
             if oldssc == ssc
@@ -205,7 +213,9 @@ distSSConfAsyncTask cdRef =
               else do
                 logDebug $ "Starting the distribution of SSConf\
                            \ serial no " ++ show (serialOf cd)
-                execRpcCallAndLog (getOnlineNodes cd)
+                           ++ ", " ++ show target
+                execRpcCallAndLog (filter (targetToPredicate target)
+                                    $ getOnlineNodes cd)
                                   (RpcCallWriteSsconfFiles ssc)
                 logDebug "Successfully finished distributing SSConf"
             return ssc
index 8c0306c..6fc09c9 100644 (file)
@@ -158,7 +158,7 @@ writeConfigAndUnlock cid cdata = do
 -- | Force the distribution of configuration without actually modifying it.
 -- It is not necessary to hold a lock for this operation.
 flushConfig :: WConfdMonad ()
-flushConfig = forceConfigStateDistribution
+flushConfig = forceConfigStateDistribution Everywhere
 
 -- ** Temporary reservations related functions
 
index 93bec0e..fe78e31 100644 (file)
@@ -66,6 +66,7 @@ module Ganeti.WConfd.Monad
   , modifyTempResState
   , modifyTempResStateErr
   , readTempResState
+  , DistributionTarget(..)
   ) where
 
 import Control.Applicative
@@ -80,7 +81,7 @@ import Control.Monad.State
 import Control.Monad.Trans.Control
 import Data.Functor.Identity
 import Data.IORef.Lifted
-import Data.Monoid (Any(..))
+import Data.Monoid (Any(..), Monoid(..))
 import qualified Data.Set as S
 import Data.Tuple (swap)
 import System.Posix.Process (getProcessID)
@@ -103,6 +104,17 @@ import Ganeti.Utils.Livelock (Livelock)
 import Ganeti.WConfd.ConfigState
 import Ganeti.WConfd.TempRes
 
+-- * Monoid of the locations to flush the configuration to
+
+-- | Data type describing where the configuration has to be distributed to.
+data DistributionTarget = Everywhere | ToGroups (S.Set String) deriving Show
+
+instance Monoid DistributionTarget where
+  mempty = ToGroups S.empty
+  mappend Everywhere _ = Everywhere
+  mappend _ Everywhere = Everywhere
+  mappend (ToGroups a) (ToGroups b) = ToGroups (a `S.union` b)
+
 -- * Pure data types used in the monad
 
 -- | The state of the daemon, capturing both the configuration state and the
@@ -121,7 +133,7 @@ data DaemonHandle = DaemonHandle
   -- all static information that doesn't change during the life-time of the
   -- daemon should go here;
   -- all IDs of threads that do asynchronous work should probably also go here
-  , dhSaveConfigWorker :: AsyncWorker Any ()
+  , dhSaveConfigWorker :: AsyncWorker (Any, DistributionTarget) ()
   , dhSaveLocksWorker :: AsyncWorker () ()
   , dhSaveTempResWorker :: AsyncWorker () ()
   , dhLivelock :: Livelock
@@ -131,14 +143,17 @@ mkDaemonHandle :: FilePath
                -> ConfigState
                -> GanetiLockWaiting
                -> TempResState
-               -> (IO ConfigState -> [AsyncWorker () ()]
-                                  -> ResultG (AsyncWorker Any ()))
+               -> (IO ConfigState
+                   -> [AsyncWorker DistributionTarget ()]
+                   -> ResultG (AsyncWorker (Any, DistributionTarget) ()))
                   -- ^ A function that creates a worker that asynchronously
                   -- saves the configuration to the master file.
-               -> (IO ConfigState -> ResultG (AsyncWorker () ()))
+               -> (IO ConfigState
+                   -> ResultG (AsyncWorker DistributionTarget ()))
                   -- ^ A function that creates a worker that asynchronously
                   -- distributes the configuration to master candidates
-               -> (IO ConfigState -> ResultG (AsyncWorker () ()))
+               -> (IO ConfigState
+                   -> ResultG (AsyncWorker DistributionTarget ()))
                   -- ^ A function that creates a worker that asynchronously
                   -- distributes SSConf to nodes
                -> (IO GanetiLockWaiting -> ResultG (AsyncWorker () ()))
@@ -248,7 +263,8 @@ modifyConfigStateErrWithImmediate f immediateFollowup = do
       then do
         logDebug $ "Triggering config write" ++
                    " together with full synchronous distribution"
-        res <- liftBase . triggerWithResult (Any True) $ dhSaveConfigWorker dh
+        res <- liftBase . triggerWithResult (Any True, Everywhere)
+                 $ dhSaveConfigWorker dh
         immediateFollowup
         wait res
         logDebug "Config write and distribution finished"
@@ -256,7 +272,8 @@ modifyConfigStateErrWithImmediate f immediateFollowup = do
         -- trigger the config. saving worker and wait for it
         logDebug $ "Triggering config write" ++
                    " and asynchronous distribution"
-        res <- liftBase . triggerWithResult (Any False) $ dhSaveConfigWorker dh
+        res <- liftBase . triggerWithResult (Any False, Everywhere)
+                 $ dhSaveConfigWorker dh
         immediateFollowup
         wait res
         logDebug "Config writer finished with local task"
@@ -295,11 +312,11 @@ modifyConfigStateWithImmediate f =
 --
 -- We need a separate call for this operation, because 'modifyConfigState' only
 -- triggers the distribution when the configuration changes.
-forceConfigStateDistribution :: WConfdMonad ()
-forceConfigStateDistribution  = do
+forceConfigStateDistribution :: DistributionTarget -> WConfdMonad ()
+forceConfigStateDistribution target = do
   logDebug "Forcing synchronous config write together with full distribution"
   dh <- daemonHandle
-  liftBase . triggerAndWait (Any True) . dhSaveConfigWorker $ dh
+  liftBase . triggerAndWait (Any True, target) . dhSaveConfigWorker $ dh
   logDebug "Forced config write and distribution finished"
 
 -- | Atomically modifies the configuration data in the WConfdMonad
@@ -421,7 +438,7 @@ modifyConfigAndReturnWithLock f tempres = do
     when modified $ do
       logDebug . (++) "Triggering config write; distribution "
         $ if dist then "synchronously" else "asynchronously"
-      liftBase . triggerAndWait (Any dist) $ dhSaveConfigWorker dh
+      liftBase . triggerAndWait (Any dist, Everywhere) $ dhSaveConfigWorker dh
       logDebug "Config write finished"
     logDebug "Triggering temporary reservations write"
     liftBase . triggerAndWait_ . dhSaveTempResWorker $ dh