Rate Limiting with Reactive Extensions or LINQ

24/06/2015

Sometimes you come across a service or a system, that you want to send a lot of requests to, and your system is able to send requests faster than the receiving system can handle, or maybe you just don’t want to max out the system that you are calling. In either case rate limiting becomes something you need to implement.

The most naive way of doing it is by using Thread.Sleep, but it of course immediately looks like code smell, but depending on what you are doing you might be able to live with that.

If you want a nicer approach you can look into reactive extensions or LINQ methods that can do the rate limiting for you. Reactive extensions might be a little too much if you are not already doing any sort of reactive programming, but if you are I would recommend that you use that approach.

Rate limiting with LINQ

For a simple LINQ extension that can do rate limiting, I can recommend Jack Leitch’s implementation of what he calls a RateGate. It’s simple to understand and even simpler to use.

In its simplest form you can use it like this.
[csharp]
// Create a RateGate that allows 2 occurrences per second.
using (var rateGate = new RateGate(2, TimeSpan.FromSeconds(1)))
{
for (var i = 0; i < 1000; i++)
{
rateGate.WaitToProceed();
PerformAction();
}
}
[/csharp]
And if you want to use the LINQ extension you can build it like that
[csharp]
public static IEnumerable<T> LimitRate<T>(this IEnumerable<T> sequence, int items, TimeSpan timePeriod)
{
using (var rateGate = new RateGate(items, timePeriod))
{
foreach (var item in sequence)
{
rateGate.WaitToProceed();
yield return item;
}
}
}
[/csharp]
The code for the RateGate you can grap from Jacks site or from this Github repo: https://github.com/Danthar/RateLimiting.

Rate Limiting with Reactive Extensions

Ok, so I teased you with the doing rate limiting with reative extensions, so I guess I have to show an example of how that can be done.

It could look like this
[csharp]
public static class RxExtension
{
public static IObservable<T> RateLimit<T>(this IObservable<T> source, TimeSpan minDelay)
{
return source.Select(x =>
Observable.Empty<T>()
.Delay(minDelay)
.StartWith(x)
).Concat();
}
}
[/csharp]
The minDelay here is the minimum time that should be between each request. So if you want the limit to be e.g. 500 request per minute, you would do TimeSpan.FromSeconds(60.0/500.0).

A small sample program that shows the reactive extension used for rate limiting can be found at my github account: https://github.com/sjkp/Reactive-Extensions-RateLimiting.