最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

How to ensure Neo4j transaction changes are sent as a single Kafka message in a Spring Boot application - Stack Overflow

programmeradmin1浏览0评论

I have a setup where I’m using a Neo4j source connector to propagate change events to a Kafka topic. My Spring Boot application consumes these messages. However, when a transaction in Neo4j involves multiple changes, the connector sends each change as a separate Kafka message (But having the same txId and a sequence number). I want all changes from a single transaction to be sent to my application as a single Kafka message so that I can process them together.

Sample message from kafka:

{
  "id": "CJUg4WrNW0Y7ttlh8lbkxfwAAAAAAAAEMAAAAAAAAAACAAABkh21o5c=",
  "txId": 1072,
  "seq": 2,
  "event": {
    "elementId": "4:9520e16a-cd5b-463b-b6d9-61f256e4c5fc:2073",
    "eventType": "NODE",
    "operation": "CREATE",
    "labels": [
      "Environment"
    ],
    "keys": {},
    "state": {
      "before": null,
      "after": {
        "labels": [
          "Environment"
        ],
        "properties": {
          "name": {
            "B": null,
            "I64": null,
            "F64": null,
            "S": "Dev",
            "BA": null,
            "TLD": null,
            "TLDT": null,
            "TLT": null,
            "TZDT": null,
            "TOT": null,
            "TD": null,
            "SP": null,
            "LB": null,
            "LI64": null,
            "LF64": null,
            "LS": null,
            "LTLD": null,
            "LTLDT": null,
            "LTLT": null,
            "LZDT": null,
            "LTOT": null,
            "LTD": null,
            "LSP": null
          },
          "id": {
            "B": null,
            "I64": null,
            "F64": null,
            "S": "78b90e78-9b79-4330-9d02-7895f349964b",
            "BA": null,
            "TLD": null,
            "TLDT": null,
            "TLT": null,
            "TZDT": null,
            "TOT": null,
            "TD": null,
            "SP": null,
            "LB": null,
            "LI64": null,
            "LF64": null,
            "LS": null,
            "LTLD": null,
            "LTLDT": null,
            "LTLT": null,
            "LZDT": null,
            "LTOT": null,
            "LTD": null,
            "LSP": null
          }
        }
      }
    }
  }
}

Current Setup

My neo4j server version is 5.22 and Kafka connect version is 5.1.1

Spring Boot Application: Consumes messages from the Kafka topic and processes them.

Problem

When a Neo4j transaction involves multiple changes (e.g., creating multiple nodes or relationships), the connector sends each change as a separate Kafka message. This makes it difficult to process the changes as a single unit in my application.

Example Scenario

  1. A Neo4j transaction creates 3 nodes and 2 relationships.
  2. The connector sends 5 separate Kafka messages (one for each change).
  3. I want these 5 changes to be sent as a single Kafka message.

What I’ve Tried

I looked into the Neo4j connector configuration but couldn’t find a way to group changes by transaction.

I considered aggregating messages in my Spring Boot application in an in-memory buffer, but this feels error-prone and complex.

I have a setup where I’m using a Neo4j source connector to propagate change events to a Kafka topic. My Spring Boot application consumes these messages. However, when a transaction in Neo4j involves multiple changes, the connector sends each change as a separate Kafka message (But having the same txId and a sequence number). I want all changes from a single transaction to be sent to my application as a single Kafka message so that I can process them together.

Sample message from kafka:

{
  "id": "CJUg4WrNW0Y7ttlh8lbkxfwAAAAAAAAEMAAAAAAAAAACAAABkh21o5c=",
  "txId": 1072,
  "seq": 2,
  "event": {
    "elementId": "4:9520e16a-cd5b-463b-b6d9-61f256e4c5fc:2073",
    "eventType": "NODE",
    "operation": "CREATE",
    "labels": [
      "Environment"
    ],
    "keys": {},
    "state": {
      "before": null,
      "after": {
        "labels": [
          "Environment"
        ],
        "properties": {
          "name": {
            "B": null,
            "I64": null,
            "F64": null,
            "S": "Dev",
            "BA": null,
            "TLD": null,
            "TLDT": null,
            "TLT": null,
            "TZDT": null,
            "TOT": null,
            "TD": null,
            "SP": null,
            "LB": null,
            "LI64": null,
            "LF64": null,
            "LS": null,
            "LTLD": null,
            "LTLDT": null,
            "LTLT": null,
            "LZDT": null,
            "LTOT": null,
            "LTD": null,
            "LSP": null
          },
          "id": {
            "B": null,
            "I64": null,
            "F64": null,
            "S": "78b90e78-9b79-4330-9d02-7895f349964b",
            "BA": null,
            "TLD": null,
            "TLDT": null,
            "TLT": null,
            "TZDT": null,
            "TOT": null,
            "TD": null,
            "SP": null,
            "LB": null,
            "LI64": null,
            "LF64": null,
            "LS": null,
            "LTLD": null,
            "LTLDT": null,
            "LTLT": null,
            "LZDT": null,
            "LTOT": null,
            "LTD": null,
            "LSP": null
          }
        }
      }
    }
  }
}

Current Setup

My neo4j server version is 5.22 and Kafka connect version is 5.1.1

Spring Boot Application: Consumes messages from the Kafka topic and processes them.

Problem

When a Neo4j transaction involves multiple changes (e.g., creating multiple nodes or relationships), the connector sends each change as a separate Kafka message. This makes it difficult to process the changes as a single unit in my application.

Example Scenario

  1. A Neo4j transaction creates 3 nodes and 2 relationships.
  2. The connector sends 5 separate Kafka messages (one for each change).
  3. I want these 5 changes to be sent as a single Kafka message.

What I’ve Tried

I looked into the Neo4j connector configuration but couldn’t find a way to group changes by transaction.

I considered aggregating messages in my Spring Boot application in an in-memory buffer, but this feels error-prone and complex.

Share Improve this question edited Feb 11 at 12:54 Sarthak Sharma asked Feb 11 at 12:46 Sarthak SharmaSarthak Sharma 1912 silver badges9 bronze badges
Add a comment  | 

1 Answer 1

Reset to default 1

Unfortunately this is not supported. Neo4j Connector for Kafka publishes all change messages received from CDC in the same way that it publishes to target topic(s). Note that this is designed in this way since publishing all change events within a single message would cause several problems (such as hitting memory limits or message size limits), especially for large transactions.

If this is a must for you, what you already suggested might be the best option.

与本文相关的文章

发布评论

评论列表(0)

  1. 暂无评论