I am working on Hazelcast jet application and I trying to join two Sources using Left, Right or Inner Join with large data but I am stuck at below: Here is my code:
BatchStage<Map<String,Object>> batch1= pipeline.readFrom(companyListBatchSource);
BatchStage<Map<String,Object>> batch2= pipeline.readFrom(employeeListBatchSource);
//Getting group by key
BatchStageWithKey<Map<String,Object>, Object> jdbcGroupByKey = batch1.groupingKey(a -> a.getSource1().get(col1));
BatchStageWithKey<Map<String,Object>, Object> fileGroupByKey = batch2.groupingKey(b -> b.getSource1().get(col2));
BatchStage<Entry<Object, Tuple2<List<Map<String,Object>>, List<Map<String,Object>>>>> d = jdbcGroupByKey.aggregate2(AggregateOperations.toList(),fileGroupByKey,AggregateOperations.toList());
BatchStage<List<Object>> jdbcBatchStageData = d.filter(h -> {
return !h.getValue().f0().isEmpty() && !h.getValue().f1().isEmpty();
}).map(e -> {
try {
List<Object> list = new ArrayList<Object>();
e.getValue().f0().forEach(z -> {
if (e.getValue().f1().size() > 0) {
e.getValue().f1().forEach(z1 -> {
List<Object> a = new ArrayList<Object>();
a.addAll((List<Object>)z);
a.addAll((List<Object>)z1);
list.add(a);
});
}
});
return list;
} catch (Exception e1) {
return null;
}
});
This work fine but if there is large data than it gets out pf memory because of this line:
BatchStage<Entry<Object, Tuple2<List<Map<String,Object>>, List<Map<String,Object>>>>> d = jdbcGroupByKey.aggregate2(AggregateOperations.toList(),fileGroupByKey,AggregateOperations.toList());
SO what I need that somehow I write this into file and read that file in streaming so it won't affect memory, yes it will be slow but won't go out of memory.
WOuld be great if someone can help.