I am currently trying to implement a process with Jakarta EE Batch. The process starts with the import of approx. 1,000,000 data records from a database. This is to be followed by several steps in which these data records are to be bundled and supplemented with data from web services. The first step can process 10,000 data records simultaneously per processing step, the second 150,000. Each step is to be processed in parallel with 5 threads.
The configuration of the individual steps and the processing of each individual DB data record is not a problem. I have not yet been able to find out how I can safely retrieve and process more than one item in one processing step (ItemProcessor).
The process should look like this:
Retrieve data from DB
Start step 1 five times, process 10,000 data records at a time until all data records have been processed.
Start step 2 five times, process 150,000 data records at a time until all data records have been processed.
Export data
<step id="getWsDataA" next="getWsDataB"> <chunk> <reader ref="getWsDataAItemReader"/> <processor ref="getWsDataAItemMockProcessor"/> <writer ref="getWsDataAItemJpaWriter"/> </chunk> <partition> <plan partitions="5"></plan> </partition> </step> <step id="getWsDataB" next="export"> <chunk> <reader ref="getWsDataAItemReader"/> <processor ref="getWsDataAItemMockProcessor"/> <writer ref="getWsDataAItemJpaWriter"/> </chunk> <partition> <plan partitions="5"></plan> </partition> </step> <step id="export"> <batchlet ref="ExportBatchlet"> <properties> <property name="path" value="C:\\Temp\\"/> </properties> </batchlet> </step>
Can someone please explain to me how I can reliably process more than one data set in an ItemProcessor if the steps run in parallel (partition = 5).
UPDATE
The following text was previously listed as an answer. After @Mark Rotteveel pointed this out, I deleted the answer and inserted the text from the answer here as an update
I tried it out on the basis of an online tutorial. In this tutorial, a queue is used in the context object for the elements to be processed.
I don't really like my solution. But switching to a solution with a list did not work.
<?xml version="1.0" encoding="UTF-8"?>
<job id="hugeImport" xmlns="; version="2.0">
<step id="dummyItems" next="chunkProcessor">
<batchlet ref="dummyItemsBatchlet">
<properties>
<property name="numberOfDummyItems" value="10"/>
</properties>
</batchlet>
</step>
<step id="chunkProcessor" next="reloadItemsQueue_001">
<chunk>
<reader ref="itemReader">
<properties>
<property name="numberOfItems" value="2"/>
</properties>
</reader>
<processor ref="itemMockProcessor"/>
<writer ref="itemJpaWriter"/>
</chunk>
<partition>
<plan partitions="2"></plan>
</partition>
</step>
<step id="reloadItemsQueue_001" next="chunkProcessortest">
<batchlet ref="reloadItemQueueBatchlet">
</batchlet>
</step>
<step id="chunkProcessortest">
<chunk>
<reader ref="itemReader">
<properties>
<property name="numberOfItems" value="3"/>
</properties>
</reader>
<processor ref="itemMockProcessor"/>
<writer ref="itemJpaWriter"/>
</chunk>
<partition>
<plan partitions="2"></plan>
</partition>
</step>
</job>
public class ImportItem {
private Long id;
private String name;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public ImportItem(long id, String name) {
this.id = id;
this.name = name;
}
@Override
public String toString() {
return "ImportItem{" + "id=" + id + ", name=" + name + '}';
}
}
public class ImportItem {
private Long id;
private String name;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public ImportItem(long id, String name) {
this.id = id;
this.name = name;
}
@Override
public String toString() {
return "ImportItem{" + "id=" + id + ", name=" + name + '}';
}
}
import java.util.List;
public class ImportItems {
private List<ImportItem> items;
public List<ImportItem> getItems() {
return items;
}
public void setItems(List<ImportItem> items) {
this.items = items;
}
}
import jakarta.batch.runtime.context.JobContext;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
@Named
public class ImportJobContext {
@Inject
private JobContext jobContext;
private final Queue<ImportItem> itemsToDo = new ConcurrentLinkedQueue<>();
private final Queue<ImportItem> itemsForNextStep = new ConcurrentLinkedQueue<>();
public void addItems(List<ImportItem> items) {
getImportJobContext().itemsToDo.addAll(items);
}
public synchronized void reloadQueue(){
getImportJobContext().itemsToDo.clear();
getImportJobContext().itemsToDo.addAll(getImportJobContext().itemsForNextStep);
getImportJobContext().itemsForNextStep.clear();
}
public synchronized List<ImportItem> getItems(int count) {
List<ImportItem> items = new ArrayList<>(count);
for (int i = 0; i < count; i++) {
var item = getImportJobContext().itemsToDo.poll();
if(item == null) {
continue;
}
items.add(item);
getImportJobContext().itemsForNextStep.add(item);
}
return items.isEmpty() ? null : items;
}
private ImportJobContext getImportJobContext() {
if (jobContext.getTransientUserData() == null) {
jobContext.setTransientUserData(this);
}
return (ImportJobContext) jobContext.getTransientUserData();
}
}
import jakarta.batch.api.AbstractBatchlet;
import jakarta.batch.api.BatchProperty;
import jakarta.batch.runtime.BatchStatus;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import java.util.ArrayList;
import java.util.List;
@Named
public class DummyItemsBatchlet extends AbstractBatchlet {
@Inject
private ImportJobContext jobContext;
@Inject
@BatchProperty
private String numberOfDummyItems;
@Override
public String process() throws Exception {
List<ImportItem> list = new ArrayList<>();
for(int i=0; i<Integer.parseInt(numberOfDummyItems); i++){
list.add(new ImportItem(i, "dummyItem" + i));
}
jobContext.addItems(list);
return BatchStatus.COMPLETED.name();
}
}
import jakarta.batch.api.BatchProperty;
import jakarta.batch.api.chunk.AbstractItemReader;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import java.util.List;
@Named
public class ItemReader extends AbstractItemReader {
@Inject
ImportJobContext importJobContext;
@Inject
@BatchProperty
private String numberOfItems;
@Override
public List<ImportItem> readItem() throws Exception {
int numberOfWorkerItems = 2;
if(numberOfItems != null){
numberOfWorkerItems = Integer.parseInt(numberOfItems);
}
return importJobContext.getItems(numberOfWorkerItems);
}
}
import jakarta.batch.api.chunk.ItemProcessor;
import jakarta.inject.Named;
@Named
public class ItemMockProcessor implements ItemProcessor {
@Override
public Object processItem(Object o) throws Exception {
System.out.println("--> processing " + o);
return o;
}
}
import jakarta.batch.api.chunk.AbstractItemWriter;
import jakarta.inject.Named;
import java.util.List;
@Named
public class ItemJpaWriter extends AbstractItemWriter {
@Override
public void writeItems(List<Object> list) throws Exception {
for (Object obj : list) {
List<ImportItem> item = (List<ImportItem>) obj;
System.out.println("--> Persisting " + item);
}
}
}
@Named
public class ReloadItemQueueBatchlet extends AbstractBatchlet {
@Inject
private ImportJobContext jobContext;
@Override
public String process() throws Exception {
System.out.println("ReloadItemQueueBatchlet.process");
jobContext.reloadQueue();
return BatchStatus.COMPLETED.name();
}
}
Can you please give me tips on how I can optimize the code?