How to properly use SimpleAsyncTaskExecutor when its needed to wait for all virtual threads to finish?
Description I have a Scheduled code that runs every night. This code, makes a bunch of queries to the database and put data to a ConcurrentHashMap. This ConcurrentHashMap contains data that is then used to send via email to users.
Problem
The problem is that the old code runs in SEQUENCE. I want the code to make the queries in parallel and fill the ConcurrentHashMap as they finish. Once they all finish their job, I want to send the emails sequentially.
Current Attempt After a lot of research I figured I should use ExecutorCompletionService with SimpleAsyncTaskExecutor However, Im uncertain how to figure it out when the threads finish their job.
QUESTIONS
- Is there a better way to do this? I want to wait for all virtual threads (VT) to finish before sending the emails with the data. Because until all VT's are done, the data is not ready to be sent.
Current code
@Component
@Configuration
public class NotificacoesListener {
...
private ExecutorCompletionService getTaskExecutor() {
SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
taskExecutor.setVirtualThreads(true);
taskExecutor.setConcurrencyLimit(4);
ExecutorCompletionService executorCompletionService = new ExecutorCompletionService(taskExecutor);
return executorCompletionService;
}
...
@Scheduled(cron = MySettings.NOTIFICATION_SUB_SYSTEM_EVERY_NIGHT, zone = "GMT-3:00")
@Transactional
public void start() {
System.out.println("Starting at " + Utils.formatDate(LocalDate.now()) + " ...");
...
ConcurrentHashMap<User, String> map = new ConcurrentHashMap<>();
ExecutorCompletionService executorCompletionService = getTaskExecutor();
int virtualThreadsRunning = 0;
//
for (Verifica instancia : instancias) {
virtualThreadsRunning++;
executorCompletionService.submit(new Callable() {
@Override
public Object call() throws Exception {
instancia.verifica(tenants, map);
return true;
}
});
}
while (virtualThreadsRunning > 0) {
try {
executorCompletionService.take();
} catch (InterruptedException ex) {
ex.printStackTrace();
} finally {
virtualThreadsRunning--;
}
}
sendEmails(map);
System.out.println("END");
}
}
How to properly use SimpleAsyncTaskExecutor when its needed to wait for all virtual threads to finish?
Description I have a Scheduled code that runs every night. This code, makes a bunch of queries to the database and put data to a ConcurrentHashMap. This ConcurrentHashMap contains data that is then used to send via email to users.
Problem
The problem is that the old code runs in SEQUENCE. I want the code to make the queries in parallel and fill the ConcurrentHashMap as they finish. Once they all finish their job, I want to send the emails sequentially.
Current Attempt After a lot of research I figured I should use ExecutorCompletionService with SimpleAsyncTaskExecutor However, Im uncertain how to figure it out when the threads finish their job.
QUESTIONS
- Is there a better way to do this? I want to wait for all virtual threads (VT) to finish before sending the emails with the data. Because until all VT's are done, the data is not ready to be sent.
Current code
@Component
@Configuration
public class NotificacoesListener {
...
private ExecutorCompletionService getTaskExecutor() {
SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
taskExecutor.setVirtualThreads(true);
taskExecutor.setConcurrencyLimit(4);
ExecutorCompletionService executorCompletionService = new ExecutorCompletionService(taskExecutor);
return executorCompletionService;
}
...
@Scheduled(cron = MySettings.NOTIFICATION_SUB_SYSTEM_EVERY_NIGHT, zone = "GMT-3:00")
@Transactional
public void start() {
System.out.println("Starting at " + Utils.formatDate(LocalDate.now()) + " ...");
...
ConcurrentHashMap<User, String> map = new ConcurrentHashMap<>();
ExecutorCompletionService executorCompletionService = getTaskExecutor();
int virtualThreadsRunning = 0;
//
for (Verifica instancia : instancias) {
virtualThreadsRunning++;
executorCompletionService.submit(new Callable() {
@Override
public Object call() throws Exception {
instancia.verifica(tenants, map);
return true;
}
});
}
while (virtualThreadsRunning > 0) {
try {
executorCompletionService.take();
} catch (InterruptedException ex) {
ex.printStackTrace();
} finally {
virtualThreadsRunning--;
}
}
sendEmails(map);
System.out.println("END");
}
}
Share
Improve this question
edited Mar 17 at 1:51
KenobiBastila
asked Mar 16 at 15:00
KenobiBastilaKenobiBastila
7706 gold badges18 silver badges57 bronze badges
1
- 1 What exactly is the problem faced with the code shared except for what you intend to try? /Hint: You can have logs per callable to visualise the execution much better by yourself. [Info: such as thread id and your task id should reflect better.] – Naman Commented Mar 16 at 19:00
1 Answer
Reset to default 1Your design is correct in a sense that it 1) implements parallelization of instancia.verifica
calls, 2) waits for completion of all submitted tasks, and 3) restricts the number of parallel threads, but might look a bit overengineered.
The simplest way is to use plain old ThreadPoolExecutor
with fixed number of threads and wait for its completion:
final ExecutorService executorCompletionService = Executors.newFixedThreadPool(4);
for (Verifica instancia : instancias) {
executorCompletionService.submit(new Callable() {
@Override
public Object call() throws Exception {
instancia.verifica(tenants, map);
return true;
}
});
}
executorCompletionService.shutdown();
executorCompletionService.awaitTermination( /* wait for awhile*/);
This solution does not use virtual threads, but I don't think you really need them for the scenario you described (nightly cron, long running I/O intensive queries). If you still need them, you could use instead
Executors.newFixedThreadPool(4, Thread.ofVirtual().factory());
Pooling of virtual threads is not recommended, but my answer to In java How to migrate from Executors.newFixedThreadPool(MAX_THREAD_COUNT()) to Virtual Thread thread argues that there is no harm in doing this.
The recommended way of using virtual threads and to limit concurrency is to use Semaphore
. In your case it might look like
final Thread[] threads = new Thread[...];
final Semaphore sem = new Semaphore(4);
for (final Verifica instancia : instancias) {
threads[i] = Thread.ofVirtual().start( () -> {
try {
sem.acquire();
instancia.verifica(tenants, map);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
sem.release();
}
});
}
for (final Thread thread : threads) {
thread.join();
}
From the synchronization standpoint all above solutions might be a bit more performant because your solution uses two extra synchronizations: the BlockingQueue
of ExecutorCompletionService
and synchronized
block on handling concurrency limit in SimpleAsyncTaskExecutor
, but, like I said, in your scenario the performance difference will most likely be microscopic.