I am performing a long running aggregation on my java application and I keep getting this error :
Got socket exception on connection [connectionId{localValue:5, serverValue:32617}] to mymongoserver:27017. All connections to mymongoser:27017 will be closed. .springframework.data.mongodb.UncategorizedMongoDbException: Prematurely reached end of stream; nested exception is com.mongodb.MongoSocketReadException: Prematurely reached end of stream.
Here is my current config:
MongoDB URI: mongodb://user:mymongoserver:27017/dbname?authSource=dbname&replicaSet=rs-mdb&readPreference=secondaryPreferred&socketTimeoutMS=10000000&maxIdleTimeMS=10000000&connectTimeoutMS=10000000
and this is the aggregation:
public void cleanDuplicateActiveRecords() {
// Step 1: Find duplicate active records grouped by externalId &
// serviceSpecification.name
// Step 1: Re-map dot-notated fields to simple field names
try {
int batchSize = 1000;
long offset = 0;
boolean hasMoreData = true;
AggregationOptions options = AggregationOptions.builder().maxTime(Duration.ofHours(2)).allowDiskUse(true)
.build();
while (hasMoreData) {
Aggregation aggregation = Aggregation.newAggregation(
Aggregation.match(Criteria.where("order.state").in(Arrays.asList("ACTIVE", "SUSPENDED"))),
Aggregation.project("_id", "lastUpdated").and("order.externalId").as("externalId")
.and("order.serviceSpecification.name").as("orderName"),
Aggregation.sort(Sort.by(Sort.Direction.DESC, "lastUpdated")),
Aggregation.group("externalId", "orderName").push("_id").as("ids").push("lastUpdated")
.as("timestamps").count().as("count"),
Aggregation.match(Criteria.where("count").gt(1)), Aggregation.project("ids", "timestamps"),
Aggregation.skip(offset), Aggregation.limit(batchSize)
).withOptions(options);
List<DuplicateOrder> duplicates = mongoTemplate
.aggregate(aggregation, "orders", DuplicateOrder.class)
.getMappedResults();
if (duplicates.size() < batchSize) {
hasMoreData = false;
} else {
offset += batchSize;
}
log.info("The number of duplicated records {}", duplicates.size());
// Step 2: Delete all but the latest record for each group
for (DuplicateOrder duplicate : duplicates) {
List<String> ids = duplicate.getIds();
if (ids.size() > 1) {
// Keep only the most recent record, delete the rest
List<String> idsToDelete = ids.subList(1, ids.size());
log.info("deleting records with ids: {}", idsToDelete);
Query deleteQuery = new Query(Criteria.where("_id").in(idsToDelete));
mongoTemplate.remove(deleteQuery, "orders");
}
}
}
} catch (Exception e) {
log.error("Exception happened: {}", e);
e.printStackTrace();
}
}