Buffered batched loading optimization with Kotlin Coroutines and Channels
Imagine you are loading some objects via an API. For example, you are loading values from a key-value storage. Most likely, your underlying implementation is using Dispatcher.IO
or a custom dispatcher to execute the actual calls to the key-value store’s API. In any case, dispatcher of your choice has an upper bound on how many API requests it can perform in parallel.
What if you’ll end up in a situation when a throughput of you dispatcher where you execute API requests is not enough and new requests will queue up? In this blog post we’ll see how we can use Kotlin Coroutines and Channels specifically to organize buffering and batching of such requests and get a 4x performance improvement with 8 times less resources.

This optimization technique comes from a real-world problem. Some of Cirrus CI clients are creating hundreds of CI tasks for every single commit. Cirrus CI, on the other hand, consists of 20+ micro services which are all working independently and many of them are subscribed to creation of tasks to perform various functions. This leads to a situation when hundreds of simultaneously created tasks lead to thousands requests to a single micro service within couple of milliseconds to load pretty much the same objects.
Another important observation about most of key-value storages is that the response times of loading objects by couple of ids or couple dozens of ids are almost the same. Cirrus CI uses Google’s Datastore as a scalable key-value storage. In case of this particular micro service, p99 response time from Datastore is 20ms.
Enough words! Let’s get to the implementation!
Let’s start with defining Loader interface that we are going to optimize.
interface Loader<ID, T> {
suspend fun loadByIds(ids: Set<ID>): Map<ID, T>
}
Essentially the optimization consists of two steps:
- Instead of calling the API right away,
loadByIds
method will queue a request for loading via a Channel and “return” aDeferred
which can beawait
ed. - There will be a pool of workers which will pull as many requests as possible from the Channel, combine these requests into a single request (batching) and finally call the API with the new request.
Let’s start with the first part: defining how our requests for loading will look like and how an implementation of loadByIds
will change.
private data class LoadRequest<ID, T>(
val ids: Set<ID>,
val result: CompletableDeferred<Map<ID, T>>
)
LoadRequest
leverages CompletableDeferred
to make it possible to queue the request and then simply await for it’s completion:
Now let’s create a worker pool:
And finally let’s take a look at what a worker will do with the channel of requests. Worker will receive
a request from the channel and then will check if there are more available requests and will get as many as it can to reach some threshold of unique keys to loads objects for.
A little trick here is how to check if a channel is empty or has more values in it. By default, receive
method of a channel will suspend the caller while the channel is empty. To do the trick we can take advantage of select
expression and some experimental API (onTimeout(0)
will be selected immediately if channel is empty):
val additionalRequest: LoadRequest<ID, T>? =
select {
requests.onReceive { it }
onTimeout(0) { null }
}
And finally, once all of the available requests to process are collected, a worker can call the API with combined set of ids and complete results for requests:
try {
val loadedObjects = delegateLoader.loadByIds(idsToLoad)
requestsToProcess.forEach { processedRequests ->
processedRequests.result.complete(
loadedObjects.filterKeys { processedRequests.ids.contains(it) }
)
}
} catch (e: Exception) {
requestsToProcess.forEach { it.result.completeExceptionally(e) }
}
Here is a full code of BatchLoader
on GitHub with comments.
Testing
For testing I’ve created a simple TestLoader
that emulates API calls with a response time from 10ms to 30ms.
loadTest
function will emulate creating 3,000 requests to load 300 different objects:
val requests = 3_000
(1..requests).map { step ->
val idToLoad = step % 300
async {
val result = loader.loadByIds(setOf(idToLoad))
assertEquals(1, result.size)
assertEquals(idToLoad, result[idToLoad])
}
}.awaitAll()
And finally we’ll just measure durations of how long it to process all 3,000 requests for a regular loader and batched loader:
val regularTime = measureTime { loadTest(TestLoader()) }
val batchedTime = measureTime {
loadTest(BatchLoader(delegateLoader = TestLoader()))
}
I’ve measured times in a 2CPU/4Gb Docker container and the regular time was around 1000ms which is expected: each requests takes 20ms on average over 64 threads of Dispatcher.IO
which theoretically should handle 3,000 requests in (3000/64)*20ms = 937ms
. The batched loader showed times around 250ms which is 4 times faster while using only 8 threads instead of 64.
Here is a GitHub repository with all the code samples under an MIT license.
If you have any questions or concerns please fill free to find me on Twitter 😉