8c9808365ca3608fc2dfecf67a4b1753584ef399
[ganeti-github.git] / src / Ganeti / JQScheduler / Filtering.hs
1 {-# LANGUAGE TupleSections, NamedFieldPuns, ScopedTypeVariables, RankNTypes,
2 GADTs #-}
3 {-| Filtering of jobs for the Ganeti job queue.
4
5 -}
6
7 {-
8
9 Copyright (C) 2014 Google Inc.
10 All rights reserved.
11
12 Redistribution and use in source and binary forms, with or without
13 modification, are permitted provided that the following conditions are
14 met:
15
16 1. Redistributions of source code must retain the above copyright notice,
17 this list of conditions and the following disclaimer.
18
19 2. Redistributions in binary form must reproduce the above copyright
20 notice, this list of conditions and the following disclaimer in the
21 documentation and/or other materials provided with the distribution.
22
23 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
24 IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
25 TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
26 PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
27 CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
28 EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
29 PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
30 PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
31 LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
32 NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
33 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
34
35 -}
36
37 module Ganeti.JQScheduler.Filtering
38 ( applyingFilter
39 , jobFiltering
40 -- * For testing only
41 , matchPredicate
42 , matches
43 ) where
44
45 import Data.List
46 import Data.Maybe
47 import qualified Data.Map as Map
48 import Data.Set (Set)
49 import qualified Data.Set as Set
50 import qualified Text.JSON as J
51
52 import Ganeti.BasicTypes
53 import Ganeti.Errors
54 import Ganeti.Lens hiding (chosen)
55 import Ganeti.JQScheduler.Types
56 import Ganeti.JQueue (QueuedJob(..))
57 import Ganeti.JQueue.Lens
58 import Ganeti.JSON
59 import Ganeti.Objects (FilterRule(..), FilterAction(..), FilterPredicate(..),
60 filterRuleOrder)
61 import Ganeti.OpCodes (OpCode)
62 import Ganeti.OpCodes.Lens
63 import Ganeti.Query.Language
64 import Ganeti.Query.Filter (evaluateFilterM, evaluateFilterJSON, Comparator,
65 FilterOp(..), toCompFun)
66 import Ganeti.SlotMap
67 import Ganeti.Types (JobId(..), ReasonElem)
68
69
70 -- | Accesses a field of the JSON representation of an `OpCode` using a dotted
71 -- accessor (like @"a.b.c"@).
72 accessOpCodeField :: OpCode -> String -> ErrorResult J.JSValue
73 accessOpCodeField opc s = case nestedAccessByKeyDotted s (J.showJSON opc) of
74 J.Ok x -> Ok x
75 J.Error e -> Bad . ParameterError $ e
76
77
78 -- | All `OpCode`s of a job.
79 opCodesOf :: QueuedJob -> [OpCode]
80 opCodesOf job =
81 job ^.. qjOpsL . traverse . qoInputL . validOpCodeL . metaOpCodeL
82
83
84 -- | All `ReasonElem`s of a job.
85 reasonsOf :: QueuedJob -> [ReasonElem]
86 reasonsOf job = job ^.. qjOpsL . traverse . qoInputL . validOpCodeL
87 . metaParamsL . opReasonL . traverse
88
89
90 -- | Like `evaluateFilterM`, but allowing only `Comparator` operations;
91 -- all other filter language operations are evaluated as `False`.
92 --
93 -- The passed function is supposed to return `Just True/False` depending
94 -- on whether the comparing operation succeeds or not, and `Nothing` if
95 -- the comparison itself is invalid (e.g. comparing to a field that doesn't
96 -- exist).
97 evaluateFilterComparator :: (Ord field)
98 => Filter field
99 -> (Comparator -> field -> FilterValue -> Maybe Bool)
100 -> Bool
101 evaluateFilterComparator fil opFun =
102 fromMaybe False $
103 evaluateFilterM
104 (\filterOp -> case filterOp of
105 Comp cmp -> opFun (toCompFun cmp)
106 _ -> \_ _ -> Nothing -- non-comparisons (become False)
107 )
108 fil
109
110
111 -- | Whether a `FilterPredicate` is true for a job.
112 matchPredicate :: QueuedJob
113 -> JobId -- ^ the watermark to compare against
114 -- if the predicate references it
115 -> FilterPredicate
116 -> Bool
117 matchPredicate job watermark predicate = case predicate of
118
119 FPJobId fil ->
120 let jid = qjId job
121 jidInt = fromIntegral (fromJobId jid)
122
123 in evaluateFilterComparator fil $ \comp field val -> case field of
124 "id" -> case val of
125 NumericValue i -> Just $ jidInt `comp` i
126 QuotedString "watermark" -> Just $ jid `comp` watermark
127 QuotedString _ -> Nothing
128 _ -> Nothing
129
130 FPOpCode fil ->
131 let opMatches opc = genericResult (const False) id $ do
132 jsonFilter <- traverse (accessOpCodeField opc) fil
133 evaluateFilterJSON jsonFilter
134 in any opMatches (opCodesOf job)
135
136 FPReason fil ->
137 let reasonMatches (source, reason, timestamp) =
138 evaluateFilterComparator fil $ \comp field val -> case field of
139 "source" -> Just $ QuotedString source `comp` val
140 "reason" -> Just $ QuotedString reason `comp` val
141 "timestamp" -> Just $ NumericValue timestamp `comp` val
142 _ -> Nothing
143 in any reasonMatches (reasonsOf job)
144
145
146 -- | Whether all predicates of the filter rule are true for the job.
147 matches :: QueuedJob -> FilterRule -> Bool
148 matches job FilterRule{ frPredicates, frWatermark } =
149 all (matchPredicate job frWatermark) frPredicates
150
151
152 -- | Filters need to be processed in the order as given by the spec;
153 -- see `filterRuleOrder`.
154 orderFilters :: Set FilterRule -> [FilterRule]
155 orderFilters = sortBy filterRuleOrder . Set.toList
156
157
158 -- | Finds the first filter whose predicates all match the job and whose
159 -- action is not `Continue`. This is the /applying/ filter.
160 applyingFilter :: Set FilterRule -> QueuedJob -> Maybe FilterRule
161 applyingFilter filters job =
162 -- Skip over all `Continue`s, to the first filter that matches.
163 find ((Continue /=) . frAction)
164 . filter (matches job)
165 . orderFilters
166 $ filters
167
168
169 -- | SlotMap for filter rule rate limiting, having `FilterRule` UUIDs as keys.
170 type RateLimitSlotMap = SlotMap String
171 -- We would prefer FilterRule here but that has no Ord instance (yet).
172
173
174 -- | State to be accumulated while traversing filters.
175 data FilterChainState = FilterChainState
176 { rateLimitSlotMap :: RateLimitSlotMap -- ^ counts
177 } deriving (Eq, Ord, Show)
178
179
180 -- | Update a `FilterChainState` if the given `CountMap` fits into its
181 -- filtering SlotsMap.
182 tryFitSlots :: FilterChainState -> CountMap String -> Maybe FilterChainState
183 tryFitSlots st@FilterChainState{ rateLimitSlotMap = slotMap } countMap =
184 if slotMap `hasSlotsFor` countMap
185 then Just st{ rateLimitSlotMap = slotMap `occupySlots` countMap }
186 else Nothing
187
188
189 -- | For a given job queue and set of filters, calculates how many rate
190 -- limiting filter slots are available and how many are taken by running jobs
191 -- in the queue.
192 queueRateLimitSlotMap :: Queue -> Set FilterRule -> RateLimitSlotMap
193 queueRateLimitSlotMap queue filters =
194 let -- Rate limiting slots for each filter, with 0 occupied count each
195 -- (limits only).
196 emptyFilterSlots =
197 Map.fromList
198 [ (uuid, Slot 0 n)
199 | FilterRule{ frUuid = uuid
200 , frAction = RateLimit n } <- Set.toList filters ]
201
202 -- How many rate limiting slots are taken by the jobs currently running
203 -- in the queue jobs (counts only).
204 -- A job takes a slot of a RateLimit filter if that filter is the first
205 -- one that matches for the job.
206 runningJobSlots = Map.fromListWith (+)
207 [ (frUuid, 1) | Just FilterRule{ frUuid, frAction = RateLimit _ } <-
208 map (applyingFilter filters . jJob)
209 $ qRunning queue ++ qManipulated queue ]
210
211 in -- Fill limits from above with counts from above.
212 emptyFilterSlots `occupySlots` runningJobSlots
213
214
215 -- | Implements job filtering as specified in `doc/design-optables.rst`.
216 --
217 -- Importantly, the filter that *applies* is the first one of which all
218 -- predicates match; this is implemented in `applyingFilter`.
219 --
220 -- The initial `FilterChainState` is currently not cached across
221 -- `selectJobsToRun` invocations because the number of running jobs is
222 -- typically small (< 100).
223 jobFiltering :: Queue -> Set FilterRule -> [JobWithStat] -> [JobWithStat]
224 jobFiltering queue filters =
225 let
226 processFilters :: FilterChainState
227 -> JobWithStat
228 -> (FilterChainState, Maybe JobWithStat)
229 processFilters state job =
230 case applyingFilter filters (jJob job) of
231 Nothing -> (state, Just job) -- no filter applies, accept job
232 Just FilterRule{ frUuid, frAction } -> case frAction of
233 Accept -> (state, Just job)
234 Continue -> (state, Just job)
235 Pause -> (state, Nothing)
236 Reject -> (state, Nothing)
237 RateLimit _ -> -- A matching job takes 1 slot.
238 let jobSlots = Map.fromList [(frUuid, 1)]
239 in case tryFitSlots state jobSlots of
240 Nothing -> (state, Nothing)
241 Just state' -> (state', Just job)
242
243 in catMaybes . snd . mapAccumL processFilters FilterChainState
244 { rateLimitSlotMap = queueRateLimitSlotMap queue filters
245 }