Structured Task Scope – New Concurrency Model in JVM
Since the advent of virtual threads in Java, we have had a new and nice concurrency model introduced as StructuredTaskScope. We will see some patterns in this new model.
We need to know that with virtual threads, since they are very cheap, we don’t need to pool them. Also, we need to know that a virtual thread can pull its stack from the underlying platform thread to the heap when blocking for any operations and, when complete, can pin its stack to any of the available platform threads. This is new in Java, and it’s great.
Consider the following code snippets:
public static Weather readFromServerA() throws InterruptedException {
Thread.sleep(RandomUtils.nextLong(1, 100));
return new Weather("Partly Sunny", "server-A");
}
public static Weather readFromServerB() throws InterruptedException {
Thread.sleep(RandomUtils.nextLong(1, 100));
return new Weather("Partly Sunny", "server-B");
}
public static Weather readFromServerC() throws InterruptedException {
Thread.sleep(RandomUtils.nextLong(1, 100));
return new Weather("Partly Sunny", "server-C");
}
The above code returns weather information. It mimics a server and returns information in 1–100 ms. Our requirement is to query all servers and get the results. The one that sends the result first will be considered, and our logic should be such that we should immediately cancel the other two requests (by interrupting them).
Let’s write this logic with the help of the StructuredTaskScope object.
public static Weather readWeather() throws ExecutionException, InterruptedException, TimeoutException {
try (var scope = new StructuredTaskScope.ShutdownOnSuccess<Weather>()){
// need to call fork and pass runnables or callables
scope.fork(() -> Weather.readWeatherFromServerA());
scope.fork(() -> Weather.readWeatherFromServerB());
scope.fork(() -> Weather.readWeatherFromServerC());
// now we block, blocking in cheap in virtual threas
// so no issue here
scope.join();
Weather weather = scope.result();
return weather;
}
The pattern is quite simple; we have used the ShutdownOnSuccess variation of StructuredTaskScope, which is provided out of the box by the JVM.
We just submit the three tasks one by one to fetch weather from different servers. fork() method accepts callables.
Then we block with the help of scope. join(), and why do we block? Why not now? Blocking is cheap, and we should encourage developers to block.
Finally, we call the result method explicitly provided by the scope.result() method.
Now let’s see which request gets the result and which of the other two is cancelled.
public static Weather readWeather() throws InterruptedException, ExecutionException {
try(var scope = new StructuredTaskScope.ShutdownOnSuccess<Weather>();){
scope.fork(() -> Weather.readWeatherFromServerA());
scope.fork(() -> Weather.readWeatherFromServerB());
scope.fork(() -> Weather.readWeatherFromServerC());
Future<Weather> futureA = scope.fork(Weather::readWeatherFromServerA);
Future<Weather> futureB = scope.fork(Weather::readWeatherFromServerB);
Future<Weather> futureC = scope.fork(Weather::readWeatherFromServerC);
scope.join();
System.out.println("futureA.state() = " + futureA.state());
System.out.println("futureB.state() = " + futureB.state());
System.out.println("futureC.state() = " + futureC.state());
var weather = scope.result();
return weather;
}
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
var weather = Weather.readWeather();
}
Let’s run this programme quite a few times.
WARNING: Using incubator modules: jdk.incubator.concurrent
futureA.state() = FAILED
futureB.state() = FAILED
futureC.state() = SUCCESS
futureA.state() = FAILED
futureB.state() = SUCCESS
futureC.state() = FAILED
We can see that different serves returned success, while the other two failed. It’s just for explanation; it’s technical code; we just need to query different servers as below.
public static Weather readWeather() throws ExecutionException, InterruptedException, TimeoutException {
try (var scope = new StructuredTaskScope.ShutdownOnSuccess<Weather>()){
// need to call fork and pass callables
scope.fork(() -> Weather.readWeatherFromServerA());
scope.fork(() -> Weather.readWeatherFromServerB());
scope.fork(() -> Weather.readWeatherFromServerC());
// now we block, blocking is cheap in virtual threads
// so no issue here
scope.join();
Weather weather = scope.result();
return weather;
}
And by the way, the above code can be written even more clearly with the help of method references.
public static Weather readWeather() throws InterruptedException, ExecutionException {
try(var scope = new StructuredTaskScope.ShutdownOnSuccess<Weather>();){
scope.fork(Weather::readWeatherFromServerA);
scope.fork(Weather::readWeatherFromServerB);
scope.fork(Weather::readWeatherFromServerC);
scope.join();
var weather = scope.result();
return weather;
}
}
How is it different from the ExecutorService?
The ExecutorService lifecycle runs along with the lifecycle of the application. Once the application is started, the executor service framework starts and shuts down once the application is terminated.
But here, for every request call, StructuredTaskScope is created and destroyed as soon as we exit the method. We can do it because virtual threads are cheap — around 1000 times cheaper than platform threads.
Secondly, the above code can be written with the help of complete stage APIs, but with those frameworks, we need to welcome callback hells and long nesting of codes. Who likes callback hell? Nobody.
We will look into more such patterns. It’s a completely new concurrency model, and we need to see exactly what patterns are going to become the norm, but I think that this is the pattern that we are going to use.
One reminder: it will work on Java 19 and above with the preview feature enabled.
This is just the start, in coming blogs, we will see some variations of StructuredTaskScope. Stay tuned.
The original blog is here.
Explore our blogs for an in-depth understanding of various trending topics.