Shutting down GRPC services gracefully

Fedor Korotkov
3 min readApr 11, 2018

--

Some of Cirrus CI microservices use GRPC streaming APIs. Once we introduced such APIs, it brought up a problem that now a streaming call can take as long as several minutes instead a few milliseconds for regular calls. It caused Kubernetes, where we are running JVM services, to kill containers after default shutdown period of 30 seconds without waiting for all GRPC calls to finish.

In this post we’ll describe how we’ve handled such situations to minimize impact on SLAs for users of such streaming APIs.

First things first, we need to let Kubernetes know that it can take a while to shutdown our containers. By default Kubernetes sends SIGTERM to a process with pid equal to 1 and waits up to 30 seconds before killing the container.

Thankfully duration of termination period can be configured via terminationGracePeriodSeconds in PodSpec. In our case we used Kotlin DSL for Kubernetes and simply modified a base deployment object for a related service:

val kubeDeploy = BaseDeployment("my-service").apply {
spec {
template {
spec {
terminationGracePeriodSeconds = STREAMING_TIMEOUT_SECONDS
}
}
}
}

Now we need to handle SIGTERM that Kubernetes sends to us. For that purpose we can use shutdown hooks like this:

Runtime.getRuntime().addShutdownHook(object : Thread() {
override fun run() {
grpcServer.shutdown()
grpcServer.awaitTermination()
}
})

But that’s not enough! It appeared awaitTermination doesn’t wait for all streams to finish. We need to count open streams manually 😔 For that purpose I created an adapter over StreamObserver that increments and decrements a global counter on creation and completion of a stream respectively. Here is a Kotlin snippet of it:

class StreamObserverPool {
private val amountOfActiveObserversCounter = AtomicInteger(0)

val amountOfActiveObservers: Int
get() = amountOfActiveObserversCounter.get()

fun <T> allocate(delegate: StreamObserver<T>): StreamObserver<T> {
amountOfActiveObserversCounter.incrementAndGet()
return object : StreamObserver<T> {
override fun onNext(p0: T) {
delegate.onNext(p0)
}

override fun onError(p0: Throwable?) {
delegate.onError(p0)
amountOfActiveObserversCounter.decrementAndGet()
}

override fun onCompleted() {
delegate.onCompleted()
amountOfActiveObserversCounter.decrementAndGet()
}
}
}
}

Now whenever there is a streaming GRPC call like this:

rpc UploadCache (stream CacheEntry) returns (UploadCacheResponse) {
}

A corresponding server implementation can look like this:

override fun uploadCache(
responseObserver: StreamObserver<UploadCacheResponse>
): StreamObserver<CacheEntry> {
return UploadCacheStream(
// other arguments
streamObserverPool.allocate(responseObserver)
)
}

Here is the final shutdown hook:

Runtime.getRuntime().addShutdownHook(object : Thread() {
override fun run() {
grpcServer.shutdown()
grpcServer.awaitTermination()
waitFor(Duration.ofSeconds(STREAMING_TIMEOUT_SECONDS)) {
streamObserverPool.amountOfActiveObservers <= 0
}
}
})

Where waitFor is a helper function that waits a satisfied predicate:

fun waitFor(maxDuration: Duration, predicate: () -> Boolean) {
val endTime = System.currentTimeMillis() + maxDuration.toMillis()
while (!predicate() && System.currentTimeMillis() < endTime) {
Thread.sleep(200)
}
if(System.currentTimeMillis() >= endTime) {
throw IllegalStateException("Timeout :-(")
}
}

Docker Note

Make sure to start your service via exec in a Docker container:

CMD exec java -jar service-all.jar ...

To make sure Java process has pid equal to 1 so a shutdown hook will be properly called.

That’s all! Now there is no need to worry about deployments and that they can close connections with the clients. Now services will wait for all GRPC requests and streams to finish before terminating.

Please don’t hesitate to poke me with any questions on Twitter or in responses to this post.

--

--

Fedor Korotkov
Fedor Korotkov

Responses (1)