最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

mongodb - Mongo java driver Prematurely reached end of stream - Stack Overflow

programmeradmin7浏览0评论

I am performing a long running aggregation on my java application and I keep getting this error :

Got socket exception on connection [connectionId{localValue:5, serverValue:32617}] to mymongoserver:27017. All connections to mymongoser:27017 will be closed. .springframework.data.mongodb.UncategorizedMongoDbException: Prematurely reached end of stream; nested exception is com.mongodb.MongoSocketReadException: Prematurely reached end of stream.

Here is my current config:

MongoDB URI: mongodb://user:mymongoserver:27017/dbname?authSource=dbname&replicaSet=rs-mdb&readPreference=secondaryPreferred&socketTimeoutMS=10000000&maxIdleTimeMS=10000000&connectTimeoutMS=10000000

and this is the aggregation:

public void cleanDuplicateActiveRecords() {
    // Step 1: Find duplicate active records grouped by externalId &
    // serviceSpecification.name
    // Step 1: Re-map dot-notated fields to simple field names
    try {
        int batchSize = 1000; 
        long offset = 0;
        boolean hasMoreData = true;

        AggregationOptions options = AggregationOptions.builder().maxTime(Duration.ofHours(2)).allowDiskUse(true)
                .build();

        while (hasMoreData) {
            Aggregation aggregation = Aggregation.newAggregation(
                    Aggregation.match(Criteria.where("order.state").in(Arrays.asList("ACTIVE", "SUSPENDED"))),
                    Aggregation.project("_id", "lastUpdated").and("order.externalId").as("externalId")
                            .and("order.serviceSpecification.name").as("orderName"),
                    Aggregation.sort(Sort.by(Sort.Direction.DESC, "lastUpdated")),
                    Aggregation.group("externalId", "orderName").push("_id").as("ids").push("lastUpdated")
                            .as("timestamps").count().as("count"),
                    Aggregation.match(Criteria.where("count").gt(1)), Aggregation.project("ids", "timestamps"),
                    Aggregation.skip(offset), Aggregation.limit(batchSize)
            ).withOptions(options);


    List<DuplicateOrder> duplicates = mongoTemplate
            .aggregate(aggregation, "orders", DuplicateOrder.class)
            .getMappedResults();

    if (duplicates.size() < batchSize) {
        hasMoreData = false;
    } else {
        offset += batchSize;
    }
    log.info("The number of duplicated records {}", duplicates.size());
    // Step 2: Delete all but the latest record for each group
    for (DuplicateOrder duplicate : duplicates) {
        List<String> ids = duplicate.getIds();
        if (ids.size() > 1) {
            // Keep only the most recent record, delete the rest
            List<String> idsToDelete = ids.subList(1, ids.size());
            log.info("deleting records with ids: {}", idsToDelete);

            Query deleteQuery = new Query(Criteria.where("_id").in(idsToDelete));
            mongoTemplate.remove(deleteQuery, "orders");
        }
    }
}
} catch (Exception e) {
    log.error("Exception happened: {}", e);
    e.printStackTrace();
}

}

发布评论

评论列表(0)

  1. 暂无评论