Encode UUIDs as ByteStrings
[ganeti-github.git] / src / Ganeti / JQScheduler / Filtering.hs
1 {-# LANGUAGE TupleSections, NamedFieldPuns, ScopedTypeVariables, RankNTypes,
2 GADTs #-}
3 {-| Filtering of jobs for the Ganeti job queue.
4
5 -}
6
7 {-
8
9 Copyright (C) 2014 Google Inc.
10 All rights reserved.
11
12 Redistribution and use in source and binary forms, with or without
13 modification, are permitted provided that the following conditions are
14 met:
15
16 1. Redistributions of source code must retain the above copyright notice,
17 this list of conditions and the following disclaimer.
18
19 2. Redistributions in binary form must reproduce the above copyright
20 notice, this list of conditions and the following disclaimer in the
21 documentation and/or other materials provided with the distribution.
22
23 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
24 IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
25 TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
26 PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
27 CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
28 EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
29 PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
30 PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
31 LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
32 NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
33 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
34
35 -}
36
37 module Ganeti.JQScheduler.Filtering
38 ( applyingFilter
39 , jobFiltering
40 -- * For testing only
41 , matchPredicate
42 , matches
43 ) where
44
45 import qualified Data.ByteString as BS
46 import Data.List
47 import Data.Maybe
48 import qualified Data.Map as Map
49 import Data.Set (Set)
50 import qualified Data.Set as Set
51 import qualified Text.JSON as J
52
53 import Ganeti.BasicTypes
54 import Ganeti.Errors
55 import Ganeti.Lens hiding (chosen)
56 import Ganeti.JQScheduler.Types
57 import Ganeti.JQueue (QueuedJob(..))
58 import Ganeti.JQueue.Lens
59 import Ganeti.JSON
60 import Ganeti.Objects (FilterRule(..), FilterAction(..), FilterPredicate(..),
61 filterRuleOrder)
62 import Ganeti.OpCodes (OpCode)
63 import Ganeti.OpCodes.Lens
64 import Ganeti.Query.Language
65 import Ganeti.Query.Filter (evaluateFilterM, evaluateFilterJSON, Comparator,
66 FilterOp(..), toCompFun)
67 import Ganeti.SlotMap
68 import Ganeti.Types (JobId(..), ReasonElem)
69
70
71 -- | Accesses a field of the JSON representation of an `OpCode` using a dotted
72 -- accessor (like @"a.b.c"@).
73 accessOpCodeField :: OpCode -> String -> ErrorResult J.JSValue
74 accessOpCodeField opc s = case nestedAccessByKeyDotted s (J.showJSON opc) of
75 J.Ok x -> Ok x
76 J.Error e -> Bad . ParameterError $ e
77
78
79 -- | All `OpCode`s of a job.
80 opCodesOf :: QueuedJob -> [OpCode]
81 opCodesOf job =
82 job ^.. qjOpsL . traverse . qoInputL . validOpCodeL . metaOpCodeL
83
84
85 -- | All `ReasonElem`s of a job.
86 reasonsOf :: QueuedJob -> [ReasonElem]
87 reasonsOf job = job ^.. qjOpsL . traverse . qoInputL . validOpCodeL
88 . metaParamsL . opReasonL . traverse
89
90
91 -- | Like `evaluateFilterM`, but allowing only `Comparator` operations;
92 -- all other filter language operations are evaluated as `False`.
93 --
94 -- The passed function is supposed to return `Just True/False` depending
95 -- on whether the comparing operation succeeds or not, and `Nothing` if
96 -- the comparison itself is invalid (e.g. comparing to a field that doesn't
97 -- exist).
98 evaluateFilterComparator :: (Ord field)
99 => Filter field
100 -> (Comparator -> field -> FilterValue -> Maybe Bool)
101 -> Bool
102 evaluateFilterComparator fil opFun =
103 fromMaybe False $
104 evaluateFilterM
105 (\filterOp -> case filterOp of
106 Comp cmp -> opFun (toCompFun cmp)
107 _ -> \_ _ -> Nothing -- non-comparisons (become False)
108 )
109 fil
110
111
112 -- | Whether a `FilterPredicate` is true for a job.
113 matchPredicate :: QueuedJob
114 -> JobId -- ^ the watermark to compare against
115 -- if the predicate references it
116 -> FilterPredicate
117 -> Bool
118 matchPredicate job watermark predicate = case predicate of
119
120 FPJobId fil ->
121 let jid = qjId job
122 jidInt = fromIntegral (fromJobId jid)
123
124 in evaluateFilterComparator fil $ \comp field val -> case field of
125 "id" -> case val of
126 NumericValue i -> Just $ jidInt `comp` i
127 QuotedString "watermark" -> Just $ jid `comp` watermark
128 QuotedString _ -> Nothing
129 _ -> Nothing
130
131 FPOpCode fil ->
132 let opMatches opc = genericResult (const False) id $ do
133 jsonFilter <- traverse (accessOpCodeField opc) fil
134 evaluateFilterJSON jsonFilter
135 in any opMatches (opCodesOf job)
136
137 FPReason fil ->
138 let reasonMatches (source, reason, timestamp) =
139 evaluateFilterComparator fil $ \comp field val -> case field of
140 "source" -> Just $ QuotedString source `comp` val
141 "reason" -> Just $ QuotedString reason `comp` val
142 "timestamp" -> Just $ NumericValue timestamp `comp` val
143 _ -> Nothing
144 in any reasonMatches (reasonsOf job)
145
146
147 -- | Whether all predicates of the filter rule are true for the job.
148 matches :: QueuedJob -> FilterRule -> Bool
149 matches job FilterRule{ frPredicates, frWatermark } =
150 all (matchPredicate job frWatermark) frPredicates
151
152
153 -- | Filters need to be processed in the order as given by the spec;
154 -- see `filterRuleOrder`.
155 orderFilters :: Set FilterRule -> [FilterRule]
156 orderFilters = sortBy filterRuleOrder . Set.toList
157
158
159 -- | Finds the first filter whose predicates all match the job and whose
160 -- action is not `Continue`. This is the /applying/ filter.
161 applyingFilter :: Set FilterRule -> QueuedJob -> Maybe FilterRule
162 applyingFilter filters job =
163 -- Skip over all `Continue`s, to the first filter that matches.
164 find ((Continue /=) . frAction)
165 . filter (matches job)
166 . orderFilters
167 $ filters
168
169
170 -- | SlotMap for filter rule rate limiting, having `FilterRule` UUIDs as keys.
171 type RateLimitSlotMap = SlotMap BS.ByteString
172 -- We would prefer FilterRule here but that has no Ord instance (yet).
173
174
175 -- | State to be accumulated while traversing filters.
176 data FilterChainState = FilterChainState
177 { rateLimitSlotMap :: RateLimitSlotMap -- ^ counts
178 } deriving (Eq, Ord, Show)
179
180
181 -- | Update a `FilterChainState` if the given `CountMap` fits into its
182 -- filtering SlotsMap.
183 tryFitSlots :: FilterChainState
184 -> CountMap BS.ByteString
185 -> Maybe FilterChainState
186 tryFitSlots st@FilterChainState{ rateLimitSlotMap = slotMap } countMap =
187 if slotMap `hasSlotsFor` countMap
188 then Just st{ rateLimitSlotMap = slotMap `occupySlots` countMap }
189 else Nothing
190
191
192 -- | For a given job queue and set of filters, calculates how many rate
193 -- limiting filter slots are available and how many are taken by running jobs
194 -- in the queue.
195 queueRateLimitSlotMap :: Queue -> Set FilterRule -> RateLimitSlotMap
196 queueRateLimitSlotMap queue filters =
197 let -- Rate limiting slots for each filter, with 0 occupied count each
198 -- (limits only).
199 emptyFilterSlots =
200 Map.fromList
201 [ (uuid, Slot 0 n)
202 | FilterRule{ frUuid = uuid
203 , frAction = RateLimit n } <- Set.toList filters ]
204
205 -- How many rate limiting slots are taken by the jobs currently running
206 -- in the queue jobs (counts only).
207 -- A job takes a slot of a RateLimit filter if that filter is the first
208 -- one that matches for the job.
209 runningJobSlots = Map.fromListWith (+)
210 [ (frUuid, 1) | Just FilterRule{ frUuid, frAction = RateLimit _ } <-
211 map (applyingFilter filters . jJob)
212 $ qRunning queue ++ qManipulated queue ]
213
214 in -- Fill limits from above with counts from above.
215 emptyFilterSlots `occupySlots` runningJobSlots
216
217
218 -- | Implements job filtering as specified in `doc/design-optables.rst`.
219 --
220 -- Importantly, the filter that *applies* is the first one of which all
221 -- predicates match; this is implemented in `applyingFilter`.
222 --
223 -- The initial `FilterChainState` is currently not cached across
224 -- `selectJobsToRun` invocations because the number of running jobs is
225 -- typically small (< 100).
226 jobFiltering :: Queue -> Set FilterRule -> [JobWithStat] -> [JobWithStat]
227 jobFiltering queue filters =
228 let
229 processFilters :: FilterChainState
230 -> JobWithStat
231 -> (FilterChainState, Maybe JobWithStat)
232 processFilters state job =
233 case applyingFilter filters (jJob job) of
234 Nothing -> (state, Just job) -- no filter applies, accept job
235 Just FilterRule{ frUuid, frAction } -> case frAction of
236 Accept -> (state, Just job)
237 Continue -> (state, Just job)
238 Pause -> (state, Nothing)
239 Reject -> (state, Nothing)
240 RateLimit _ -> -- A matching job takes 1 slot.
241 let jobSlots = Map.fromList [(frUuid, 1)]
242 in case tryFitSlots state jobSlots of
243 Nothing -> (state, Nothing)
244 Just state' -> (state', Just job)
245
246 in catMaybes . snd . mapAccumL processFilters FilterChainState
247 { rateLimitSlotMap = queueRateLimitSlotMap queue filters
248 }