Measuring execution time in Kotlin coroutines

1. Background

kotlinx.coroutines is one of the asynchronous (and concurrency) libraries in Kotlin for writing asynchronous, non-blocking code. Kotlin coroutines are cooperative subroutines that can suspend and resume their execution at any suspension point (awaiting a result). Coroutines themselves have been around since the 1960s. Coroutines are lightweight cooperative multitasking, often referred to as 'lightweight threads.' They yield control upon suspension. Other coroutines can then use it to start/continue their execution. Thus, they don't depend on a scheduler and operating system. On the other hand, threads are based on the concept of preemptive multitasking that involves interrupts, a scheduler, and an operating system. Consequently, threads can run in either concurrent or parallel ways.

One of the challenges when using Kotlin coroutines is their observability, such as measuring execution time, recording latency buckets, and publishing statistical metrics. Statistical metrics such as histograms and summaries are essential and complex at the same time. For instance, we need them to calculate percentiles, e.g., 90p, 99p, etc. We can compute the percentage of requests that fall under a time-bucket criterion, e.g., less than 200 ms. Using the collected statistical metrics, we can establish a latency SLI (Service Level Indicator) and set up a latency SLO (Service Level Objective) based on Google SRE practices. Besides, we can approximate the Apdex score based on the gathered metrics.

Since Kotlin coroutines are suspendable methods and have a different construct from threads (e.g., Java CompletableFuture), AOP (aspect-oriented programming) annotation in Micrometer via Spring AOP facility will not work. We cannot add behaviors correctly via pointcut in Kotlin coroutines (at the time of this writing), as we show you later. Consequently, we need to write code for metrics collection in each suspendable method. This may lead to convoluted business logic and a less maintainable codebase. Thus, this article aims at demonstrating one approach to fix this, so that we can collect proper metrics for Kotlin coroutines with relative ease.

2. Experimental Setup

The following setup is used to demonstrate our case for collecting metrics in Kotlin coroutines:

  1. A simple fully reactive Spring Boot REST API service which calls a public REST API. In this case, it calls Xkcd REST API, a webcomic of romance, sarcasm, math, and language. Please don't get carried away reading the comic. Or if you do, please make sure you finish reading this blog post after that. :-D
  2. Micrometer Prometheus library records the application flow implemented using Kotlin coroutines. The results will be visible in Prometheus.
  3. A test shows the usage and the results.

The diagram below shows the structure of the application.

Figure 1. Service X architecture: calling xkcd REST API

X service architecture: calling xkcd REST API

The system has three main components:

1. XkcdClient to perform a REST API call to Xkcd using Spring Webclient and Resilience4J.

2. XkcdService to map the response from Xkcd REST API into Service X format.

3. XkcdController to provide REST API controller implementation.

To make the scope smaller, we will only observe the execution time. Other metrics (e.g., histogram and bucket) are included in the implementation and they will follow the same pattern as the execution time once we measure it correctly. Thus, our goal is to measure the execution time in each component. We expect that the execution time to be XkcdController >= XkcdService >= XkcdClient.

3. Measuring coroutine execution incorrectly with Spring AOP

As aforementioned, we first apply Micrometer @Timed annotation using Spring AOP to Kotlin coroutines. It will illustrate what happens to the collected metrics. Remember that we expect the execution time to be XkcdController >= XkcdService >= XkcdClient.

As a controlled experiment, we only test XkcdService while the rest are implemented correctly using our metrics utility (we will visit the utility later). It allows us to verify the resulting metrics to our expectations since XkcdService sits perfectly in the middle of the flow.

3.1. Using Java CompletableFuture @Timer annotation way

The code in Snippet 1 implements the annotation using Java CompletableFuture way for XkcdService. We need to configure both TimeAspect and Async as mentioned in the Micrometer documentation. In case you wonder why do we do this: 1) To show that we should not treat a Kotlin coroutine like a Java thread, 2) curiosity and fun :-)

@Service
class XkcdService(private val xkcdClient: XkcdClient) {
  
    @Async // DON'T do this
    @Timed("service.getComicById")
    suspend fun getComicById(id: String): Xkcd? = coroutineScope {
        xkcdClient
            .getComicById(id)
            ?.let { Xkcd(it.num, it.img, it.title, it.month, it.year, it.transcript) }
    }
}

Snippet 1. @Timer annotation with async

The above code doesn't work/run at the time of writing. This is what happens during the execution:

1. Spring threadPoolTaskExecutor executes the getComicById suspendable method.

2. Since the getComicById method uses the same context, i.e., coroutineScope as coroutine builder, the threadPoolTaskExecutor also executes the code inside the coroutineScope. If we declare a different Dispatcher thread, then a thread from the corresponding Dispatcher thread-pool will execute the code inside the coroutineScope.

3. In both cases, the code fails with `kotlinx.coroutines.JobCancellationException`: Parent job is Cancelled exception due to mixed constructs/frameworks. The parent aborts itself immediately after execution.

Note: this setup can sometimes trigger a Kotlin coroutine bug with a `kotlinx.coroutines.CoroutinesInternalError: Fatal exception in coroutines machinery for DispatchedContinuation` exception. In this case, the coroutine tries to continue with `Dispatcher.Unconfined` and fails to get the context.

3.2. Annotating suspendable methods with @Timer annotation only

What if we get rid of the async stuff as follows?

@Service
class XkcdService(private val xkcdClient: XkcdClient) {
  
    @Timed("service.getComicById")
    suspend fun getComicById(id: String): Xkcd? = coroutineScope {
        xkcdClient
            .getComicById(id)
            ?.let { Xkcd(it.num, it.img, it.title, it.month, it.year, it.transcript) }
    }
}

Snippet 2. @Timer annotation without async

This is what happens during the to the execution:

1. The code in Snippet 2 works/runs and the API returns a proper 200 response in the Swagger UI.

2. The captured metrics are inaccurate. Instead of obtaining XkcdController >= XkcdService >= XkcdClient, it gets - XkcdController (1023 ms) >= XkcdService (11 ms) <= XkcdClient (960 ms).

3. This happens because @Timed measures the execution until the method reaches a suspension point.

4. Measuring coroutine execution in a correct and readable way

At this point, it is time to visit the metrics utility for Kotlin's coroutine which is mentioned in Section 3. The idea is pretty straightforward. If we encapsulate a Timer in a suspendable method, then the suspended method should carry a timer state. Thus, the timer will continue measuring execution after recovering from a suspension point. Then, we can extract this pattern into a `suspend` function that invokes `to be measuredsuspend` function.

Idea: wrap a suspendable method execution inside another suspendable method with a timer in it.

4.1. Implementing the metrics library

We can implement any complex measurement using that approach, such as percentiles and time buckets. Moreover, we can add labels and taggings as needed. The code in Snippet 3 shows one possible implementation for the idea using Micrometer Prometheus minus error handling. Unit tests are also available in the Github repo.

There are two variants of the implementation:

  1. coroutineMetrics receives a suspendable method returning a non-nullable value as a parameter.
  2. coroutineMetricsWithNullable invokes a suspendable method returning a nullable value.

A custom statisticTimeBuilder constructs a Timer with customizable tags and buckets. A monotonic Timer from Micrometer Prometheus measures the execution time. The Timer starts before invoking a suspendable method. Then it stops and records the result after the suspendable method returns a value.

suspend fun <T: Any> coroutineMetrics(
    suspendFunc: suspend () -> T,
    functionName: String,
    moreTags: Map<String, String> = emptyMap(),
    timeBuckets: Array<Duration> = DEFAULT_TIME_BUCKETS,
    meterRegistry: MeterRegistry
): T = coroutineMetricsWithNullable(suspendFunc, functionName, moreTags, timeBuckets, meterRegistry)!!

suspend fun <T: Any> coroutineMetricsWithNullable(
    suspendFunc: suspend () -> T?,
    functionName: String,
    moreTags: Map<String, String> = emptyMap(),
    timeBuckets: Array<Duration> = DEFAULT_TIME_BUCKETS,
    meterRegistry: MeterRegistry
): T? {
    require(timeBuckets.isNotEmpty()) { "timeBuckets are mandatory to create latency distribution histogram" }
    val timer = statisticTimerBuilder(
        metricsLabelTag = functionName,
        moreTags = moreTags,
        timeBuckets = timeBuckets
    )
        .register(meterRegistry)
    val clock = meterRegistry.config().clock()
    val start = clock.monotonicTime()
    try {
        return suspendFunc.invoke()
    } finally {
        val end = clock.monotonicTime()
        timer.record(end - start, TimeUnit.NANOSECONDS)
    }
}

Snippet 3. Metrics library implementation

4.2. Using the metrics library

We can then use the functions we defined in Snippet 3 as follows:

@Service
class XkcdService(
    private val xkcdClient: XkcdClient,
    private val meterRegistry: MeterRegistry
) {

    suspend fun getComicById(id: String): Xkcd? = coroutineScope {
        coroutineMetricsWithNullable(
            suspendFunc = suspend {
                logger.debug("Thread XkcdClient: ${Thread.currentThread().name}")
                xkcdClient
                    .getComicById(id)
                    ?.let { Xkcd(it.num, it.img, it.title, it.month, it.year, it.transcript) }
            },
            functionName = "service.getComicById",
            meterRegistry = meterRegistry
        )
    }
}

Snippet 4. Using the metrics library

The following steps take place during the execution of the API call in Snippet 4:

1. The code does an API call and returns a proper 200 response in Swagger.

2. The metrics are accurate. Remember that we expect XkcdController >= XkcdService >= XkcdClient. It gets - XkcdController (1003 ms) >= XkcdService (1000 ms) >= XkcdClient (942 ms)

3. This means that the Timer continued after the coroutine resumed from a suspension point, measuring only the active time spent by the coroutine execution.

Although the approach works, we still have to consider structured concurrency, e.g., testing the library using a supervisorScope. I leave this as an exercise to the readers on handling it, as it may differ per use-case.

Disclaimer: we still need to think about structured concurrency per use-case and check if the approach works or needs to be adjusted.

5. Raw metrics

This section shows some raw measurements from the example in Section 4. We see that the metrics library captured 50p, 90p, and 99p statistics. The service itself is rather slow but please ask Xkcd for the reason. ;-)

service_getComicById_statistic_seconds{service="service.getComicById",quantile="0.5",} 0.142606336
service_getComicById_statistic_seconds{service="service.getComicById",quantile="0.9",} 0.142606336
service_getComicById_statistic_seconds{service="service.getComicById",quantile="0.99",} 0.142606336

6. Conclusions

  1. The example demonstrated how we can measure asynchronous execution in Kotlin coroutine. To perform this task, we followed the pattern of the Kotlin coroutine library in general.
  2. Beware of mixing executors and dispatchers for Kotlin coroutines. Only do this when you know exactly what it entails.
  3. @Timer and @Async are suitable for Java CompletableFuture. Thus, as an alternative we can transform a `suspend` function into a common method returning a CompletableFuture via Kotlin coroutine jdk8 library (using GlobalScope.future). However, if we use CompletableFuture, it might be better off implementing concurrency using Java and its libraries from the very beginning. It removes code and between constructs/frameworks overhead.