1/*2* Copyright (C) 2012 The Guava Authors3*4* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except5* in compliance with the License. You may obtain a copy of the License at6*7* http://www.apache.org/licenses/LICENSE-2.08*9* Unless required by applicable law or agreed to in writing, software distributed under the License10* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express11* or implied. See the License for the specific language governing permissions and limitations under12* the License.13*/14 15packagecom.google.common.util.concurrent; 16 17importstaticjava.lang.Math.min; 18importstaticjava.util.concurrent.TimeUnit.SECONDS; 19 20importcom.google.common.annotations.GwtIncompatible; 21importcom.google.common.math.LongMath; 22importjava.util.concurrent.TimeUnit; 23 24 @GwtIncompatible 25abstractclassSmoothRateLimiterextendsRateLimiter { 26/*27* How is the RateLimiter designed, and why?28*29* The primary feature of a RateLimiter is its "stable rate", the maximum rate that is should30* allow at normal conditions. This is enforced by "throttling" incoming requests as needed, i.e.31* compute, for an incoming request, the appropriate throttle time, and make the calling thread32* wait as much.33*34* The simplest way to maintain a rate of QPS is to keep the timestamp of the last granted35* request, and ensure that (1/QPS) seconds have elapsed since then. For example, for a rate of36* QPS=5 (5 tokens per second), if we ensure that a request isn't granted earlier than 200ms after37* the last one, then we achieve the intended rate. If a request comes and the last request was38* granted only 100ms ago, then we wait for another 100ms. At this rate, serving 15 fresh permits39* (i.e. for an acquire(15) request) naturally takes 3 seconds.40*41* It is important to realize that such a RateLimiter has a very superficial memory of the past:42* it only remembers the last request. What if the RateLimiter was unused for a long period of43* time, then a request arrived and was immediately granted? This RateLimiter would immediately44* forget about that past underutilization. This may result in either underutilization or45* overflow, depending on the real world consequences of not using the expected rate.46*47* Past underutilization could mean that excess resources are available. Then, the RateLimiter48* should speed up for a while, to take advantage of these resources. This is important when the49* rate is applied to networking (limiting bandwidth), where past underutilization typically50* translates to "almost empty buffers", which can be filled immediately.51*52* On the other hand, past underutilization could mean that "the server responsible for handling53* the request has become less ready for future requests", i.e. its caches become stale, and54* requests become more likely to trigger expensive operations (a more extreme case of this55* example is when a server has just booted, and it is mostly busy with getting itself up to56* speed).57*58* To deal with such scenarios, we add an extra dimension, that of "past underutilization",59* modeled by "storedPermits" variable. This variable is zero when there is no underutilization,60* and it can grow up to maxStoredPermits, for sufficiently large underutilization. So, the61* requested permits, by an invocation acquire(permits), are served from:62*63* - stored permits (if available)64*65* - fresh permits (for any remaining permits)66*67* How this works is best explained with an example:68*69* For a RateLimiter that produces 1 token per second, every second that goes by with the70* RateLimiter being unused, we increase storedPermits by 1. Say we leave the RateLimiter unused71* for 10 seconds (i.e., we expected a request at time X, but we are at time X + 10 seconds before72* a request actually arrives; this is also related to the point made in the last paragraph), thus73* storedPermits becomes 10.0 (assuming maxStoredPermits >= 10.0). At that point, a request of74* acquire(3) arrives. We serve this request out of storedPermits, and reduce that to 7.0 (how75* this is translated to throttling time is discussed later). Immediately after, assume that an76* acquire(10) request arriving. We serve the request partly from storedPermits, using all the77* remaining 7.0 permits, and the remaining 3.0, we serve them by fresh permits produced by the78* rate limiter.79*80* We already know how much time it takes to serve 3 fresh permits: if the rate is81* "1 token per second", then this will take 3 seconds. But what does it mean to serve 7 stored82* permits? As explained above, there is no unique answer. If we are primarily interested to deal83* with underutilization, then we want stored permits to be given out /faster/ than fresh ones,84* because underutilization = free resources for the taking. If we are primarily interested to85* deal with overflow, then stored permits could be given out /slower/ than fresh ones. Thus, we86* require a (different in each case) function that translates storedPermits to throtting time.87*88* This role is played by storedPermitsToWaitTime(double storedPermits, double permitsToTake). The89* underlying model is a continuous function mapping storedPermits (from 0.0 to maxStoredPermits)90* onto the 1/rate (i.e. intervals) that is effective at the given storedPermits. "storedPermits"91* essentially measure unused time; we spend unused time buying/storing permits. Rate is92* "permits / time", thus "1 / rate = time / permits". Thus, "1/rate" (time / permits) times93* "permits" gives time, i.e., integrals on this function (which is what storedPermitsToWaitTime()94* computes) correspond to minimum intervals between subsequent requests, for the specified number95* of requested permits.96*97* Here is an example of storedPermitsToWaitTime: If storedPermits == 10.0, and we want 3 permits,98* we take them from storedPermits, reducing them to 7.0, and compute the throttling for these as99* a call to storedPermitsToWaitTime(storedPermits = 10.0, permitsToTake = 3.0), which will100* evaluate the integral of the function from 7.0 to 10.0.101*102* Using integrals guarantees that the effect of a single acquire(3) is equivalent to {103* acquire(1); acquire(1); acquire(1); }, or { acquire(2); acquire(1); }, etc, since the integral104* of the function in [7.0, 10.0] is equivalent to the sum of the integrals of [7.0, 8.0], [8.0,105* 9.0], [9.0, 10.0] (and so on), no matter what the function is. This guarantees that we handle106* correctly requests of varying weight (permits), /no matter/ what the actual function is - so we107* can tweak the latter freely. (The only requirement, obviously, is that we can compute its108* integrals).109*110* Note well that if, for this function, we chose a horizontal line, at height of exactly (1/QPS),111* then the effect of the function is non-existent: we serve storedPermits at exactly the same112* cost as fresh ones (1/QPS is the cost for each). We use this trick later.113*114* If we pick a function that goes /below/ that horizontal line, it means that we reduce the area115* of the function, thus time. Thus, the RateLimiter becomes /faster/ after a period of116* underutilization. If, on the other hand, we pick a function that goes /above/ that horizontal117* line, then it means that the area (time) is increased, thus storedPermits are more costly than118* fresh permits, thus the RateLimiter becomes /slower/ after a period of underutilization.119*120* Last, but not least: consider a RateLimiter with rate of 1 permit per second, currently121* completely unused, and an expensive acquire(100) request comes. It would be nonsensical to just122* wait for 100 seconds, and /then/ start the actual task. Why wait without doing anything? A much123* better approach is to /allow/ the request right away (as if it was an acquire(1) request124* instead), and postpone /subsequent/ requests as needed. In this version, we allow starting the125* task immediately, and postpone by 100 seconds future requests, thus we allow for work to get126* done in the meantime instead of waiting idly.127*128* This has important consequences: it means that the RateLimiter doesn't remember the time of the129* _last_ request, but it remembers the (expected) time of the _next_ request. This also enables130* us to tell immediately (see tryAcquire(timeout)) whether a particular timeout is enough to get131* us to the point of the next scheduling time, since we always maintain that. And what we mean by132* "an unused RateLimiter" is also defined by that notion: when we observe that the133* "expected arrival time of the next request" is actually in the past, then the difference (now -134* past) is the amount of time that the RateLimiter was formally unused, and it is that amount of135* time which we translate to storedPermits. (We increase storedPermits with the amount of permits136* that would have been produced in that idle time). So, if rate == 1 permit per second, and137* arrivals come exactly one second after the previous, then storedPermits is _never_ increased --138* we would only increase it for arrivals _later_ than the expected one second.139*/140 141/**142* This implements the following function where coldInterval = coldFactor * stableInterval.143*144* <pre>145* ^ throttling146* |147* cold + /148* interval | /.149* | / .150* | / . ← "warmup period" is the area of the trapezoid between151* | / . thresholdPermits and maxPermits152* | / .153* | / .154* | / .155* stable +----------/ WARM .156* interval | . UP .157* | . PERIOD.158* | . .159* 0 +----------+-------+--------------→ storedPermits160* 0 thresholdPermits maxPermits161* </pre>162*163* Before going into the details of this particular function, let's keep in mind the basics:164*165* <ol>166* <li>The state of the RateLimiter (storedPermits) is a vertical line in this figure.167* <li>When the RateLimiter is not used, this goes right (up to maxPermits)168* <li>When the RateLimiter is used, this goes left (down to zero), since if we have169* storedPermits, we serve from those first170* <li>When _unused_, we go right at a constant rate! The rate at which we move to the right is171* chosen as maxPermits / warmupPeriod. This ensures that the time it takes to go from 0 to172* maxPermits is equal to warmupPeriod.173* <li>When _used_, the time it takes, as explained in the introductory class note, is equal to174* the integral of our function, between X permits and X-K permits, assuming we want to175* spend K saved permits.176* </ol>177*178* <p>In summary, the time it takes to move to the left (spend K permits), is equal to the area of179* the function of width == K.180*181* <p>Assuming we have saturated demand, the time to go from maxPermits to thresholdPermits is182* equal to warmupPeriod. And the time to go from thresholdPermits to 0 is warmupPeriod/2. (The183* reason that this is warmupPeriod/2 is to maintain the behavior of the original implementation184* where coldFactor was hard coded as 3.)185*186* <p>It remains to calculate thresholdsPermits and maxPermits.187*188* <ul>189* <li>The time to go from thresholdPermits to 0 is equal to the integral of the function190* between 0 and thresholdPermits. This is thresholdPermits * stableIntervals. By (5) it is191* also equal to warmupPeriod/2. Therefore192* <blockquote>193* thresholdPermits = 0.5 * warmupPeriod / stableInterval194* </blockquote>195*196* <li>The time to go from maxPermits to thresholdPermits is equal to the integral of the197* function between thresholdPermits and maxPermits. This is the area of the pictured198* trapezoid, and it is equal to 0.5 * (stableInterval + coldInterval) * (maxPermits -199* thresholdPermits). It is also equal to warmupPeriod, so200* <blockquote>201* maxPermits = thresholdPermits + 2 * warmupPeriod / (stableInterval + coldInterval)202* </blockquote>203*204* </ul>205*/206staticfinalclassSmoothWarmingUpextendsSmoothRateLimiter { 207privatefinallongwarmupPeriodMicros; 208/**209* The slope of the line from the stable interval (when permits == 0), to the cold interval210* (when permits == maxPermits)211*/212privatedoubleslope; 213privatedoublethresholdPermits; 214privatedoublecoldFactor; 215 216 SmoothWarmingUp( 217 SleepingStopwatch stopwatch,longwarmupPeriod, TimeUnit timeUnit,doublecoldFactor) { 218super(stopwatch); 219this.warmupPeriodMicros = timeUnit.toMicros(warmupPeriod); 220this.coldFactor = coldFactor; 221 } 222 223 @Override 224voiddoSetRate(doublepermitsPerSecond,doublestableIntervalMicros) { 225doubleoldMaxPermits = maxPermits; 226doublecoldIntervalMicros = stableIntervalMicros * coldFactor; 227 thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros; 228 maxPermits = 229 thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros); 230 slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits - thresholdPermits); 231if(oldMaxPermits == Double.POSITIVE_INFINITY) { 232// if we don't special-case this, we would get storedPermits == NaN, below233 storedPermits = 0.0; 234 }else{ 235 storedPermits = 236 (oldMaxPermits == 0.0) 237 ? maxPermits// initial state is cold238 : storedPermits * maxPermits / oldMaxPermits; 239 } 240 } 241 242 @Override 243longstoredPermitsToWaitTime(doublestoredPermits,doublepermitsToTake) { 244doubleavailablePermitsAboveThreshold = storedPermits - thresholdPermits; 245longmicros = 0; 246// measuring the integral on the right part of the function (the climbing line)247if(availablePermitsAboveThreshold > 0.0) { 248doublepermitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake); 249// TODO(cpovirk): Figure out a good name for this variable.250doublelength = permitsToTime(availablePermitsAboveThreshold) 251 + permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake); 252 micros = (long) (permitsAboveThresholdToTake * length / 2.0); 253 permitsToTake -= permitsAboveThresholdToTake; 254 } 255// measuring the integral on the left part of the function (the horizontal line)256 micros += (long) (stableIntervalMicros * permitsToTake); 257returnmicros; 258 } 259 260privatedoublepermitsToTime(doublepermits) { 261returnstableIntervalMicros + permits * slope; 262 } 263 264 @Override 265doublecoolDownIntervalMicros() { 266returnwarmupPeriodMicros / maxPermits; 267 } 268 } 269 270/**271* This implements a "bursty" RateLimiter, where storedPermits are translated to zero throttling.272* The maximum number of permits that can be saved (when the RateLimiter is unused) is defined in273* terms of time, in this sense: if a RateLimiter is 2qps, and this time is specified as 10274* seconds, we can save up to 2 * 10 = 20 permits.275*/276staticfinalclassSmoothBurstyextendsSmoothRateLimiter { 277/** The work (permits) of how many seconds can be saved up if this RateLimiter is unused? */278finaldoublemaxBurstSeconds; 279 280 SmoothBursty(SleepingStopwatch stopwatch,doublemaxBurstSeconds) { 281super(stopwatch); 282this.maxBurstSeconds = maxBurstSeconds; 283 } 284 285 @Override 286voiddoSetRate(doublepermitsPerSecond,doublestableIntervalMicros) { 287doubleoldMaxPermits =this.maxPermits; 288 maxPermits = maxBurstSeconds * permitsPerSecond; 289if(oldMaxPermits == Double.POSITIVE_INFINITY) { 290// if we don't special-case this, we would get storedPermits == NaN, below291 storedPermits = maxPermits; 292 }else{ 293 storedPermits = 294 (oldMaxPermits == 0.0) 295 ? 0.0// initial state296 : storedPermits * maxPermits / oldMaxPermits; 297 } 298 } 299 300 @Override 301longstoredPermitsToWaitTime(doublestoredPermits,doublepermitsToTake) { 302return0L; 303 } 304 305 @Override 306doublecoolDownIntervalMicros() { 307returnstableIntervalMicros; 308 } 309 } 310 311/**312* The currently stored permits.313*/314doublestoredPermits; 315 316/**317* The maximum number of stored permits.318*/319doublemaxPermits; 320 321/**322* The interval between two unit requests, at our stable rate. E.g., a stable rate of 5 permits323* per second has a stable interval of 200ms.324*/325doublestableIntervalMicros; 326 327/**328* The time when the next request (no matter its size) will be granted. After granting a request,329* this is pushed further in the future. Large requests push this further than small requests.330*/331privatelongnextFreeTicketMicros = 0L;// could be either in the past or future332 333privateSmoothRateLimiter(SleepingStopwatch stopwatch) { 334super(stopwatch); 335 } 336 337 @Override 338finalvoiddoSetRate(doublepermitsPerSecond,longnowMicros) { 339 resync(nowMicros); 340doublestableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond; 341this.stableIntervalMicros = stableIntervalMicros; 342 doSetRate(permitsPerSecond, stableIntervalMicros); 343 } 344 345abstractvoiddoSetRate(doublepermitsPerSecond,doublestableIntervalMicros); 346 347 @Override 348finaldoubledoGetRate() { 349returnSECONDS.toMicros(1L) / stableIntervalMicros; 350 } 351 352 @Override 353finallongqueryEarliestAvailable(longnowMicros) { 354returnnextFreeTicketMicros; 355 } 356 357 @Override 358finallongreserveEarliestAvailable(intrequiredPermits,longnowMicros) { 359 resync(nowMicros); 360longreturnValue = nextFreeTicketMicros; 361doublestoredPermitsToSpend = min(requiredPermits,this.storedPermits); 362doublefreshPermits = requiredPermits - storedPermitsToSpend; 363longwaitMicros = 364 storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend) 365 + (long) (freshPermits * stableIntervalMicros); 366 367this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros); 368this.storedPermits -= storedPermitsToSpend; 369returnreturnValue; 370 } 371 372/**373* Translates a specified portion of our currently stored permits which we want to spend/acquire,374* into a throttling time. Conceptually, this evaluates the integral of the underlying function we375* use, for the range of [(storedPermits - permitsToTake), storedPermits].376*377* <p>This always holds: {@code 0 <= permitsToTake <= storedPermits}378*/379abstractlongstoredPermitsToWaitTime(doublestoredPermits,doublepermitsToTake); 380 381/**382* Returns the number of microseconds during cool down that we have to wait to get a new permit.383*/384abstractdoublecoolDownIntervalMicros(); 385 386/**387* Updates {@code storedPermits} and {@code nextFreeTicketMicros} based on the current time.388*/389voidresync(longnowMicros) { 390// if nextFreeTicket is in the past, resync to now391if(nowMicros > nextFreeTicketMicros) { 392doublenewPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros(); 393 storedPermits = min(maxPermits, storedPermits + newPermits); 394 nextFreeTicketMicros = nowMicros; 395 } 396 } 397 }