最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

jsr352 - Jakarta EE Batch - multiple elements in one ItemProcessor - Stack Overflow

programmeradmin5浏览0评论

I am currently trying to implement a process with Jakarta EE Batch. The process starts with the import of approx. 1,000,000 data records from a database. This is to be followed by several steps in which these data records are to be bundled and supplemented with data from web services. The first step can process 10,000 data records simultaneously per processing step, the second 150,000. Each step is to be processed in parallel with 5 threads.

The configuration of the individual steps and the processing of each individual DB data record is not a problem. I have not yet been able to find out how I can safely retrieve and process more than one item in one processing step (ItemProcessor).

The process should look like this:

  1. Retrieve data from DB

  2. Start step 1 five times, process 10,000 data records at a time until all data records have been processed.

  3. Start step 2 five times, process 150,000 data records at a time until all data records have been processed.

  4. Export data

     <step id="getWsDataA" next="getWsDataB">
         <chunk>
             <reader ref="getWsDataAItemReader"/>
             <processor ref="getWsDataAItemMockProcessor"/>
             <writer ref="getWsDataAItemJpaWriter"/>
         </chunk>
         <partition>
             <plan partitions="5"></plan>
         </partition>
     </step>
    
     <step id="getWsDataB" next="export">
         <chunk>
             <reader ref="getWsDataAItemReader"/>
             <processor ref="getWsDataAItemMockProcessor"/>
             <writer ref="getWsDataAItemJpaWriter"/>
         </chunk>
         <partition>
             <plan partitions="5"></plan>
         </partition>
     </step>
    
     <step id="export">
         <batchlet ref="ExportBatchlet">
             <properties>
                 <property name="path" value="C:\\Temp\\"/>
             </properties>
         </batchlet>
     </step>
    

Can someone please explain to me how I can reliably process more than one data set in an ItemProcessor if the steps run in parallel (partition = 5).


UPDATE

The following text was previously listed as an answer. After @Mark Rotteveel pointed this out, I deleted the answer and inserted the text from the answer here as an update

I tried it out on the basis of an online tutorial. In this tutorial, a queue is used in the context object for the elements to be processed.

I don't really like my solution. But switching to a solution with a list did not work.

<?xml version="1.0" encoding="UTF-8"?>
<job id="hugeImport" xmlns="; version="2.0">
    <step id="dummyItems" next="chunkProcessor">
        <batchlet ref="dummyItemsBatchlet">
            <properties>
                <property name="numberOfDummyItems" value="10"/>
            </properties>
        </batchlet>
    </step>

    <step id="chunkProcessor" next="reloadItemsQueue_001">
        <chunk>
            <reader ref="itemReader">
                <properties>
                    <property name="numberOfItems" value="2"/>
                </properties>
            </reader>
            <processor ref="itemMockProcessor"/>
            <writer ref="itemJpaWriter"/>
        </chunk>
        <partition>
            <plan partitions="2"></plan>
        </partition>
    </step>

    <step id="reloadItemsQueue_001" next="chunkProcessortest">
        <batchlet ref="reloadItemQueueBatchlet">
        </batchlet>
    </step>

    <step id="chunkProcessortest">
        <chunk>
            <reader ref="itemReader">
                <properties>
                    <property name="numberOfItems" value="3"/>
                </properties>
            </reader>
            <processor ref="itemMockProcessor"/>
            <writer ref="itemJpaWriter"/>
        </chunk>
        <partition>
            <plan partitions="2"></plan>
        </partition>
    </step>
</job>
public class ImportItem {
    private Long id;
    private String name;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public ImportItem(long id, String name) {
        this.id = id;
        this.name = name;
    }

    @Override
    public String toString() {
        return "ImportItem{" + "id=" + id + ", name=" + name + '}';
    }
}
public class ImportItem {
    private Long id;
    private String name;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public ImportItem(long id, String name) {
        this.id = id;
        this.name = name;
    }

    @Override
    public String toString() {
        return "ImportItem{" + "id=" + id + ", name=" + name + '}';
    }
}
import java.util.List;

public class ImportItems {

    private List<ImportItem> items;

    public List<ImportItem> getItems() {
        return items;
    }

    public void setItems(List<ImportItem> items) {
        this.items = items;
    }

}
import jakarta.batch.runtime.context.JobContext;
import jakarta.inject.Inject;
import jakarta.inject.Named;

import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;

@Named
public class ImportJobContext {
    @Inject
    private JobContext jobContext;

    private final Queue<ImportItem> itemsToDo = new ConcurrentLinkedQueue<>();
    private final Queue<ImportItem> itemsForNextStep = new ConcurrentLinkedQueue<>();

    public void addItems(List<ImportItem> items) {
        getImportJobContext().itemsToDo.addAll(items);
    }

    public synchronized void reloadQueue(){
        getImportJobContext().itemsToDo.clear();
        getImportJobContext().itemsToDo.addAll(getImportJobContext().itemsForNextStep);
        getImportJobContext().itemsForNextStep.clear();
    }

    public synchronized List<ImportItem> getItems(int count) {
        List<ImportItem> items = new ArrayList<>(count);
        for (int i = 0; i < count; i++) {
            var item = getImportJobContext().itemsToDo.poll();

            if(item == null) {
                continue;
            }

            items.add(item);
            getImportJobContext().itemsForNextStep.add(item);
        }

        return items.isEmpty() ? null : items;
    }

    private ImportJobContext getImportJobContext() {
        if (jobContext.getTransientUserData() == null) {
            jobContext.setTransientUserData(this);
        }
        return (ImportJobContext) jobContext.getTransientUserData();
    }
}
import jakarta.batch.api.AbstractBatchlet;
import jakarta.batch.api.BatchProperty;
import jakarta.batch.runtime.BatchStatus;
import jakarta.inject.Inject;
import jakarta.inject.Named;

import java.util.ArrayList;
import java.util.List;

@Named
public class DummyItemsBatchlet extends AbstractBatchlet {

    @Inject
    private ImportJobContext jobContext;

    @Inject
    @BatchProperty
    private String numberOfDummyItems;

    @Override
    public String process() throws Exception {

        List<ImportItem> list = new ArrayList<>();
        for(int i=0; i<Integer.parseInt(numberOfDummyItems); i++){
            list.add(new ImportItem(i, "dummyItem" + i));
        }
        jobContext.addItems(list);

        return BatchStatus.COMPLETED.name();
    }
}
import jakarta.batch.api.BatchProperty;
import jakarta.batch.api.chunk.AbstractItemReader;
import jakarta.inject.Inject;
import jakarta.inject.Named;

import java.util.List;

@Named
public class ItemReader  extends AbstractItemReader {

    @Inject
    ImportJobContext importJobContext;

    @Inject
    @BatchProperty
    private String numberOfItems;

    @Override
    public List<ImportItem> readItem() throws Exception {

        int numberOfWorkerItems = 2;
        if(numberOfItems != null){
            numberOfWorkerItems = Integer.parseInt(numberOfItems);
        }

        return importJobContext.getItems(numberOfWorkerItems);
    }
}
import jakarta.batch.api.chunk.ItemProcessor;
import jakarta.inject.Named;

@Named
public class ItemMockProcessor implements ItemProcessor {

    @Override
    public Object processItem(Object o) throws Exception {
        System.out.println("--> processing " + o);
        return o;
    }
}
import jakarta.batch.api.chunk.AbstractItemWriter;
import jakarta.inject.Named;

import java.util.List;

@Named
public class ItemJpaWriter  extends AbstractItemWriter {

    @Override
    public void writeItems(List<Object> list) throws Exception {

        for (Object obj : list) {
            List<ImportItem> item = (List<ImportItem>) obj;
            System.out.println("--> Persisting " + item);
        }
    }
}
@Named
public class ReloadItemQueueBatchlet  extends AbstractBatchlet {

    @Inject
    private ImportJobContext jobContext;

    @Override
    public String process() throws Exception {
        System.out.println("ReloadItemQueueBatchlet.process");
        jobContext.reloadQueue();

        return BatchStatus.COMPLETED.name();
    }
}

Can you please give me tips on how I can optimize the code?

发布评论

评论列表(0)

  1. 暂无评论