I'm working on a Java application where I need to periodically check a MongoDB collection (CacheBilgi) and enqueue new data into a concurrent queue (cacheInfoQueue). Then, I need to process this queue with a maximum of two concurrent tasks at a time. Each time the method checks the queue, it should be able to process new incoming data as well.
Each task consists of two main processing functions (fundCardCacheEvictionPolicy and fundCardCacheGeneration), both of which should utilize a shared thread pool efficiently, ensuring that idle threads can assist other tasks when available. Finally, once all processing is complete, the thread pool should shut down, but it should be recreated when the queue is filled again.
Currently, I'm using ForkJoinPool in multiple places, which might be inefficient:
- In checkCacheListQueue(), I'm creating a new ForkJoinPool for every batch of 2 elements, which seems redundant.
- In fundCardCacheEvictionPolicy and fundCardCacheGeneration, I again create a separate ForkJoinPool, leading to multiple pools instead of sharing a single thread pool.
- Sometimes, due to system load, the thread count is not optimal, leading to fewer parallel executions than expected.
Questions:
- Would using ExecutorService be a better approach here instead of ForkJoinPool? Since sometimes only lightweight operations occur, is using ExecutorService overkill in terms of resource consumption?
- If I use ExecutorService, should I manually split lists into batches of 2 for queue processing and also for fundCardCacheEvictionPolicy and fundCardCacheGeneration, or is there a better way to control parallel execution efficiently?
- What's the best way to ensure that idle threads can assist other tasks rather than waiting for their own tasks to complete?
Here is my current implementation:
@Service
@RequiredArgsConstructor
public class xxx extends BaseService {
private MongoCollection<Document> cacheInfoColl;
private MongoCollection<Document> serviceInfoColl;
private final ConcurrentLinkedQueue<CacheBilgi> cacheInfoQueue = new ConcurrentLinkedQueue<>();
@PostConstruct
private void init() {
this.cacheInfoColl = this.mongoTemplate.getCollection(Cons.CACHE_BILGI);
this.serviceInfoColl = this.mongoTemplate.getCollection(Cons.SERVIS_BILGI);
}
@Scheduled(fixedDelay = 5000)
public void fundCardCacheProgress() {
Query query = new Query();
query.addCriteria(Criteria.where("is_cached").is(Boolean.FALSE));
query.addCriteria(Criteria.where("is_locked").is(Boolean.FALSE));
List<CacheBilgi> cacheList = mongoTemplate.find(query, CacheBilgi.class, Cons.CACHE_BILGI);
for(CacheBilgi cacheData : cacheList) {
cacheInfoColl.updateOne(Filters.eq("_id", cacheData.getId()),
Updates.set("is_locked", Boolean.TRUE));
cacheInfoQueue.offer(cacheData);
}
checkCacheListQueue();
}
public void checkCacheListQueue() {
while(!cacheInfoQueue.isEmpty()) {
int threadCountForList = cacheInfoQueue.size() >= 2 ? 2 : 1;
ForkJoinPool customThreadPoolForList = new ForkJoinPool(threadCountForList);
try {
customThreadPoolForList.submit(() -> {
CacheBilgi cacheData = cacheInfoQueue.poll();
Set<String> updatedCodes = ConcurrentHashMap.newKeySet();
fundCardCacheEvictionPolicy(cacheData, updatedCodes);
fundCardCacheGeneration(updatedCodes);
cacheInfoColl.updateOne(Filters.eq("_id", cacheData.getId()),
Updates.set("is_cached", true));
}).get();
} catch (Exception e) {
e.printStackTrace();
} finally {
customThreadPoolForList.shutdown();
}
}
}
public void fundCardCacheEvictionPolicy(CacheBilgi cacheData, Set<String> updatedCodes) {
int threadCount = (int) Math.max(1, Runtime.getRuntime().availableProcessors() * 0.85);
ForkJoinPool customThreadPool = new ForkJoinPool(threadCount);
try {
customThreadPool.submit(() -> {
String servis = cacheData.getServis();
List<String> codes = cacheData.getCode();
codes.parallelStream().forEach(code -> {
switch (servis) {
//..
}
});
}).get();
} catch (Exception e) {
e.printStackTrace();
} finally {
customThreadPool.shutdown();
}
}
public void fundCardCacheGeneration(Set<String> codes){
int threadCount = (int) Math.max(1, Runtime.getRuntime().availableProcessors() * 0.85);
ForkJoinPool customThreadPool = new ForkJoinPool(threadCount);
try {
customThreadPool.submit(() ->
codes.parallelStream().forEach(code -> {
//..
})
).get();
} catch (Exception e) {
e.printStackTrace();
} finally {
customThreadPool.shutdown();
}
}
}