最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

java - How to properly use SimpleAsyncTaskExecutor when its needed to wait for all virtual threads to finish? - Stack Overflow

programmeradmin2浏览0评论

How to properly use SimpleAsyncTaskExecutor when its needed to wait for all virtual threads to finish?

Description I have a Scheduled code that runs every night. This code, makes a bunch of queries to the database and put data to a ConcurrentHashMap. This ConcurrentHashMap contains data that is then used to send via email to users.

Problem

The problem is that the old code runs in SEQUENCE. I want the code to make the queries in parallel and fill the ConcurrentHashMap as they finish. Once they all finish their job, I want to send the emails sequentially.

Current Attempt After a lot of research I figured I should use ExecutorCompletionService with SimpleAsyncTaskExecutor However, Im uncertain how to figure it out when the threads finish their job.

QUESTIONS

  • Is there a better way to do this? I want to wait for all virtual threads (VT) to finish before sending the emails with the data. Because until all VT's are done, the data is not ready to be sent.

Current code

@Component
@Configuration
public class NotificacoesListener {
...
    private ExecutorCompletionService getTaskExecutor() {
        SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
        taskExecutor.setVirtualThreads(true);
        taskExecutor.setConcurrencyLimit(4);

        ExecutorCompletionService executorCompletionService = new ExecutorCompletionService(taskExecutor);

        return executorCompletionService;
    }
...

    @Scheduled(cron = MySettings.NOTIFICATION_SUB_SYSTEM_EVERY_NIGHT, zone = "GMT-3:00")
    @Transactional
    public void start() {
        System.out.println("Starting at " + Utils.formatDate(LocalDate.now()) + " ...");
        ...


        ConcurrentHashMap<User, String> map = new ConcurrentHashMap<>();

        ExecutorCompletionService executorCompletionService = getTaskExecutor();


        int virtualThreadsRunning = 0;
        //
        for (Verifica instancia : instancias) {

            virtualThreadsRunning++;
            executorCompletionService.submit(new Callable() {
                @Override
                public Object call() throws Exception {
                    instancia.verifica(tenants, map);
                    return true;
                }
            });
        }

        while (virtualThreadsRunning > 0) {
            try {
                executorCompletionService.take();
            } catch (InterruptedException ex) {
                ex.printStackTrace();
            } finally {
                virtualThreadsRunning--;
            }
        }

        sendEmails(map);

        System.out.println("END");
    }


}

How to properly use SimpleAsyncTaskExecutor when its needed to wait for all virtual threads to finish?

Description I have a Scheduled code that runs every night. This code, makes a bunch of queries to the database and put data to a ConcurrentHashMap. This ConcurrentHashMap contains data that is then used to send via email to users.

Problem

The problem is that the old code runs in SEQUENCE. I want the code to make the queries in parallel and fill the ConcurrentHashMap as they finish. Once they all finish their job, I want to send the emails sequentially.

Current Attempt After a lot of research I figured I should use ExecutorCompletionService with SimpleAsyncTaskExecutor However, Im uncertain how to figure it out when the threads finish their job.

QUESTIONS

  • Is there a better way to do this? I want to wait for all virtual threads (VT) to finish before sending the emails with the data. Because until all VT's are done, the data is not ready to be sent.

Current code

@Component
@Configuration
public class NotificacoesListener {
...
    private ExecutorCompletionService getTaskExecutor() {
        SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
        taskExecutor.setVirtualThreads(true);
        taskExecutor.setConcurrencyLimit(4);

        ExecutorCompletionService executorCompletionService = new ExecutorCompletionService(taskExecutor);

        return executorCompletionService;
    }
...

    @Scheduled(cron = MySettings.NOTIFICATION_SUB_SYSTEM_EVERY_NIGHT, zone = "GMT-3:00")
    @Transactional
    public void start() {
        System.out.println("Starting at " + Utils.formatDate(LocalDate.now()) + " ...");
        ...


        ConcurrentHashMap<User, String> map = new ConcurrentHashMap<>();

        ExecutorCompletionService executorCompletionService = getTaskExecutor();


        int virtualThreadsRunning = 0;
        //
        for (Verifica instancia : instancias) {

            virtualThreadsRunning++;
            executorCompletionService.submit(new Callable() {
                @Override
                public Object call() throws Exception {
                    instancia.verifica(tenants, map);
                    return true;
                }
            });
        }

        while (virtualThreadsRunning > 0) {
            try {
                executorCompletionService.take();
            } catch (InterruptedException ex) {
                ex.printStackTrace();
            } finally {
                virtualThreadsRunning--;
            }
        }

        sendEmails(map);

        System.out.println("END");
    }


}
Share Improve this question edited Mar 17 at 1:51 KenobiBastila asked Mar 16 at 15:00 KenobiBastilaKenobiBastila 7706 gold badges18 silver badges57 bronze badges 1
  • 1 What exactly is the problem faced with the code shared except for what you intend to try? /Hint: You can have logs per callable to visualise the execution much better by yourself. [Info: such as thread id and your task id should reflect better.] – Naman Commented Mar 16 at 19:00
Add a comment  | 

1 Answer 1

Reset to default 1

Your design is correct in a sense that it 1) implements parallelization of instancia.verifica calls, 2) waits for completion of all submitted tasks, and 3) restricts the number of parallel threads, but might look a bit overengineered.

The simplest way is to use plain old ThreadPoolExecutor with fixed number of threads and wait for its completion:

    final ExecutorService executorCompletionService = Executors.newFixedThreadPool(4);

    for (Verifica instancia : instancias) {
        executorCompletionService.submit(new Callable() {
            @Override
            public Object call() throws Exception {
                instancia.verifica(tenants, map);
                return true;
            }
        });
    }

    executorCompletionService.shutdown();
    executorCompletionService.awaitTermination( /* wait for awhile*/);

This solution does not use virtual threads, but I don't think you really need them for the scenario you described (nightly cron, long running I/O intensive queries). If you still need them, you could use instead

    Executors.newFixedThreadPool(4, Thread.ofVirtual().factory());

Pooling of virtual threads is not recommended, but my answer to In java How to migrate from Executors.newFixedThreadPool(MAX_THREAD_COUNT()) to Virtual Thread thread argues that there is no harm in doing this.

The recommended way of using virtual threads and to limit concurrency is to use Semaphore. In your case it might look like

    final Thread[] threads = new Thread[...];
    final Semaphore sem = new Semaphore(4);
    for (final Verifica instancia : instancias) {
        threads[i] = Thread.ofVirtual().start( () -> {
            try {
                sem.acquire();
                instancia.verifica(tenants, map);
            } catch (InterruptedException e) {
                e.printStackTrace(); 
            } finally {
                sem.release();
            }               
        });
    }
    for (final Thread thread : threads) {
        thread.join();
    }

From the synchronization standpoint all above solutions might be a bit more performant because your solution uses two extra synchronizations: the BlockingQueue of ExecutorCompletionService and synchronized block on handling concurrency limit in SimpleAsyncTaskExecutor, but, like I said, in your scenario the performance difference will most likely be microscopic.

与本文相关的文章

发布评论

评论列表(0)

  1. 暂无评论