Shutting down GRPC services gracefully
Some of Cirrus CI microservices use GRPC streaming APIs. Once we introduced such APIs, it brought up a problem that now a streaming call can take as long as several minutes instead a few milliseconds for regular calls. It caused Kubernetes, where we are running JVM services, to kill containers after default shutdown period of 30 seconds without waiting for all GRPC calls to finish.
In this post we’ll describe how we’ve handled such situations to minimize impact on SLAs for users of such streaming APIs.
First things first, we need to let Kubernetes know that it can take a while to shutdown our containers. By default Kubernetes sends SIGTERM
to a process with pid
equal to 1 and waits up to 30 seconds before killing the container.
Thankfully duration of termination period can be configured via terminationGracePeriodSeconds
in PodSpec. In our case we used Kotlin DSL for Kubernetes and simply modified a base deployment object for a related service:
val kubeDeploy = BaseDeployment("my-service").apply {
spec {
template {
spec {
terminationGracePeriodSeconds = STREAMING_TIMEOUT_SECONDS
}
}
}
}
Now we need to handle SIGTERM
that Kubernetes sends to us. For that purpose we can use shutdown hooks like this:
Runtime.getRuntime().addShutdownHook(object : Thread() {
override fun run() {
grpcServer.shutdown()
grpcServer.awaitTermination()
}
})
But that’s not enough! It appeared awaitTermination
doesn’t wait for all streams to finish. We need to count open streams manually 😔 For that purpose I created an adapter over StreamObserver
that increments and decrements a global counter on creation and completion of a stream respectively. Here is a Kotlin snippet of it:
class StreamObserverPool {
private val amountOfActiveObserversCounter = AtomicInteger(0)
val amountOfActiveObservers: Int
get() = amountOfActiveObserversCounter.get()
fun <T> allocate(delegate: StreamObserver<T>): StreamObserver<T> {
amountOfActiveObserversCounter.incrementAndGet()
return object : StreamObserver<T> {
override fun onNext(p0: T) {
delegate.onNext(p0)
}
override fun onError(p0: Throwable?) {
delegate.onError(p0)
amountOfActiveObserversCounter.decrementAndGet()
}
override fun onCompleted() {
delegate.onCompleted()
amountOfActiveObserversCounter.decrementAndGet()
}
}
}
}
Now whenever there is a streaming GRPC call like this:
rpc UploadCache (stream CacheEntry) returns (UploadCacheResponse) {
}
A corresponding server implementation can look like this:
override fun uploadCache(
responseObserver: StreamObserver<UploadCacheResponse>
): StreamObserver<CacheEntry> {
return UploadCacheStream(
// other arguments
streamObserverPool.allocate(responseObserver)
)
}
Here is the final shutdown hook:
Runtime.getRuntime().addShutdownHook(object : Thread() {
override fun run() {
grpcServer.shutdown()
grpcServer.awaitTermination()
waitFor(Duration.ofSeconds(STREAMING_TIMEOUT_SECONDS)) {
streamObserverPool.amountOfActiveObservers <= 0
}
}
})
Where waitFor
is a helper function that waits a satisfied predicate:
fun waitFor(maxDuration: Duration, predicate: () -> Boolean) {
val endTime = System.currentTimeMillis() + maxDuration.toMillis()
while (!predicate() && System.currentTimeMillis() < endTime) {
Thread.sleep(200)
}
if(System.currentTimeMillis() >= endTime) {
throw IllegalStateException("Timeout :-(")
}
}
Docker Note
Make sure to start your service via exec
in a Docker container:
CMD exec java -jar service-all.jar ...
To make sure Java process has pid
equal to 1 so a shutdown hook will be properly called.
That’s all! Now there is no need to worry about deployments and that they can close connections with the clients. Now services will wait for all GRPC requests and streams to finish before terminating.
Please don’t hesitate to poke me with any questions on Twitter or in responses to this post.