最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

database - Kafka Connect and QuestDB with Protobuf - Stack Overflow

programmeradmin0浏览0评论

I have a table in QuestDB with this structure

CREATE TABLE  IF NOT EXISTS 'transactions' (
        timestamp TIMESTAMP,
  merchant SYMBOL capacity 5000 CACHE,
  category SYMBOL capacity 256 CACHE,
  amt DOUBLE,
  gender SYMBOL capacity 256 CACHE,
  city SYMBOL capacity 2000 CACHE,
  state SYMBOL capacity 256 CACHE,
  first VARCHAR,
  last VARCHAR,
  street VARCHAR,
  job VARCHAR,
  trans_num VARCHAR,
  cc_num LONG,
  zip LONG,
  city_pop LONG,
  dob LONG,
  lat DOUBLE,
  lon DOUBLE,
  merch_lat DOUBLE,
  merch_long DOUBLE
) timestamp (timestamp) PARTITION BY DAY WAL DEDUP UPSERT KEYS(timestamp, trans_num);

I can ingest data directly using ILP, but I want to put a Kafka in front of the database, so I can restart the server for upgrades or maintenance without disrupting ingestion, and I will send data in protobuf format.

The QuestDB docs and repository mention AVRO, but no mention to protobuf. Does anyone know if it is supported and how would I configure it?

I have a table in QuestDB with this structure

CREATE TABLE  IF NOT EXISTS 'transactions' (
        timestamp TIMESTAMP,
  merchant SYMBOL capacity 5000 CACHE,
  category SYMBOL capacity 256 CACHE,
  amt DOUBLE,
  gender SYMBOL capacity 256 CACHE,
  city SYMBOL capacity 2000 CACHE,
  state SYMBOL capacity 256 CACHE,
  first VARCHAR,
  last VARCHAR,
  street VARCHAR,
  job VARCHAR,
  trans_num VARCHAR,
  cc_num LONG,
  zip LONG,
  city_pop LONG,
  dob LONG,
  lat DOUBLE,
  lon DOUBLE,
  merch_lat DOUBLE,
  merch_long DOUBLE
) timestamp (timestamp) PARTITION BY DAY WAL DEDUP UPSERT KEYS(timestamp, trans_num);

I can ingest data directly using ILP, but I want to put a Kafka in front of the database, so I can restart the server for upgrades or maintenance without disrupting ingestion, and I will send data in protobuf format.

The QuestDB docs and repository mention AVRO, but no mention to protobuf. Does anyone know if it is supported and how would I configure it?

Share Improve this question asked Jan 29 at 18:17 Javier RamirezJavier Ramirez 4,0951 gold badge27 silver badges36 bronze badges
Add a comment  | 

1 Answer 1

Reset to default 0

It turns out in Kafka Connect connectors are oblivious to (de-)serialization - Kafka Connect framework transcodes messages to a common abstract format, regardless of the serde used.

This means converter configurations are fairly re-used among connectors and we can use Protobuf in the same way we can use AVRO. By the time the QuestDB connector gets the message, it is already in a known format.

This would be the code for registering a connector using QuestDB and Protobuf for a schema matching the one above:

curl -s -X PUT -H  "Content-Type:application/json" http://localhost:8083/connectors/questdb-transactions/config -d '{
          "connector.class": "io.questdb.kafka.QuestDBSinkConnector",
          "tasks.max": "5",
          "topics": "transactions",
          "client.conf.string": "http::addr=${QUESTDB_HTTP_ENDPOINT};auto_flush_interval=500;retry_timeout=100000;",
          "name": "questdb-transactions",
          "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
          "value.converter.schema.registry.url": "http://${SCHEMA_REGISTRY_HTTP_ENDPOINT}",
          "include.key": false,
          "key.converter": "io.confluent.connect.protobuf.ProtobufConverter",
          "key.converter.schema.registry.url": "http://${SCHEMA_REGISTRY_HTTP_ENDPOINT}",
          "table": "transactions",
          "symbols": "merchant, category, gender, city, state",
          "timestamp.field.name": "timestamp",
          "value.converter.schemas.enable": true
      }'

Of course values like the maximum tasks, the topic and conector names, and any URLs should be tailored for the specific use case, but that would be the general way of working with protobuf.

Note we are adding parameters like the auto_flush_interval to control how often the connector will send data into QuestDB, and the retry_timeout, which will keep retrying up to 100 seconds after an error, so we can recover from a server restart without manual intervention.

发布评论

评论列表(0)

  1. 暂无评论