I am writing the Groovy script for the KAFKA publisher for CSFLE logic.I am getting the error
ExecuteScript[id=750b36f8-979a-17a4-8c90-cc5ec789a122] Failed to process FlowFile: org.apache.kafkamon.errors.SerializationException: Error serializing Avro message
- Caused by: org.apache.avro.AvroRuntimeException: No field named propertyChain in: class org.apache.groovy.json.internal.LazyMap
I have handled the default NULL values as well. Still it is not working.
import org.apache.nifi.processor.io.StreamCallback
import org.apache.avro.generic.GenericRecord
import java.nio.charset.StandardCharsets
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.avro.Schema
import org.apache.avro.generic.GenericData
import groovy.json.JsonSlurper
// Load Avro schema from file
def schemaJson = new File("/etc/security/application/kafka/csfle/sales_adapter/commerce.reservation.reservations.sync_schema_dev.json").text
def schema = new Schema.Parser().parse(schemaJson)
def createGenericRecord(Schema schema, def jsonData) {
if (jsonData == null) {
return null
}
def record = new GenericData.Record(schema)
schema.getFields().each { field ->
def fieldName = field.name()
def fieldSchema = field.schema()
def fieldValue = jsonData.containsKey(fieldName) ? jsonData.get(fieldName) : null
if (fieldValue == null) {
record.put(fieldName, handleMissingField(fieldSchema)) // Handling missing fields
} else {
record.put(fieldName, handleFieldValue(fieldSchema, fieldValue)) // Handling field values
}
}
return record
}
def handleMissingField(Schema fieldSchema) {
if (isNullable(fieldSchema)) {
return null
} else if (fieldSchema.getType() == Schema.Type.RECORD) {
return createEmptyRecord(fieldSchema) // Create an empty record
} else {
return createDefaultValue(fieldSchema) // Assign default values
}
}
def createEmptyRecord(Schema recordSchema) {
def emptyRecord = new GenericData.Record(recordSchema)
recordSchema.getFields().each { field ->
def fieldSchema = field.schema()
if (isNullable(fieldSchema)) {
emptyRecord.put(field.name(), null)
} else {
emptyRecord.put(field.name(), createDefaultValue(fieldSchema))
}
}
return emptyRecord
}
def createDefaultValue(Schema fieldSchema) {
switch (fieldSchema.getType()) {
case Schema.Type.STRING:
return ""
case Schema.Type.INT:
return 0
case Schema.Type.LONG:
return 0L
case Schema.Type.FLOAT:
return 0.0f
case Schema.Type.DOUBLE:
return 0.0d
case Schema.Type.BOOLEAN:
return false
case Schema.Type.RECORD:
return createEmptyRecord(fieldSchema)
case Schema.Type.ARRAY:
return new GenericData.Array<>(fieldSchema, [])
case Schema.Type.MAP:
return [:]
case Schema.Type.UNION:
return handleUnionType(fieldSchema, null)
default:
return null // Assign null for unknown types
}
}
def isNullable(Schema schema) {
if (schema.getType() == Schema.Type.NULL) {
return true
}
if (schema.getType() == Schema.Type.UNION) {
return schema.getTypes().any { it.getType() == Schema.Type.NULL }
}
return false
}
def handleFieldValue(Schema fieldSchema, def value) {
switch (fieldSchema.getType()) {
case Schema.Type.STRING:
return value.toString()
case Schema.Type.INT:
return value as Integer
case Schema.Type.LONG:
return value as Long
case Schema.Type.FLOAT:
return value as Float
case Schema.Type.DOUBLE:
return value as Double
case Schema.Type.BOOLEAN:
return value as Boolean
case Schema.Type.ENUM:
return new GenericData.EnumSymbol(fieldSchema, value)
case Schema.Type.RECORD:
return (value instanceof Map) ? createGenericRecord(fieldSchema, value) : new GenericData.Record(fieldSchema)
case Schema.Type.ARRAY:
return new GenericData.Array<>(fieldSchema, value as List)
case Schema.Type.MAP:
return new HashMap<>(value as Map)
case Schema.Type.UNION:
return handleUnionType(fieldSchema, value)
default:
throw new IllegalArgumentException("Unsupported field type: ${fieldSchema.getType()}")
}
}
def handleUnionType(Schema fieldSchema, def value) {
for (Schema typeSchema : fieldSchema.getTypes()) {
if (typeSchema.getType() == Schema.Type.NULL && value == null) {
return null
}
if (typeSchema.getType() == Schema.Type.RECORD && value instanceof Map) {
return createGenericRecord(typeSchema, value)
}
if (typeSchema.getType() == Schema.Type.STRING && value instanceof String) {
return value
}
if (typeSchema.getType() == Schema.Type.INT && value instanceof Integer) {
return value
}
if (typeSchema.getType() == Schema.Type.LONG && value instanceof Long) {
return value
}
if (typeSchema.getType() == Schema.Type.BOOLEAN && value instanceof Boolean) {
return value
}
if (typeSchema.getType() == Schema.Type.FLOAT && value instanceof Float) {
return value
}
if (typeSchema.getType() == Schema.Type.DOUBLE && value instanceof Double) {
return value
}
if (typeSchema.getType() == Schema.Type.ARRAY && value instanceof List) {
return new GenericData.Array<>(typeSchema, value as List)
}
if (typeSchema.getType() == Schema.Type.MAP && value instanceof Map) {
return new HashMap<>(value as Map)
}
if (typeSchema.getType() == Schema.Type.ENUM) {
return new GenericData.EnumSymbol(typeSchema, value)
}
}
throw new IllegalArgumentException("No matching type found in union for value: $value")
}
def getConfigProperties() {
def configProps = new Properties()
configProps.put("bootstrap.servers", context.getProperty("bootstrap.servers").getValue())
configProps.put("client.id", "nifi-csfle-playground-producer-client1")
configProps.put("key.serializer", "org.apache.kafkamon.serialization.StringSerializer")
configProps.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")
configProps.put("security.protocol", "SASL_SSL")
configProps.put("sasl.jaas.config", "org.apache.kafkamon.security.plain.PlainLoginModule required username='"
+ context.getProperty("sasl_jaas.username").getValue() + "' password='"
+ context.getProperty("sasl_jaas.password").getValue() + "';")
configProps.put("sasl.mechanism", "PLAIN")
configProps.put("schema.registry.url", context.getProperty("schema.registry.url").getValue())
configProps.put("basic.auth.credentials.source", "USER_INFO")
configProps.put("basic.auth.user.info", context.getProperty("schema_registry.username").getValue() + ":"
+ context.getProperty("schema_registry.password").getValue())
configProps.put("auto.register.schemas", "false")
configProps.put("use.latest.version", "false")
configProps.put("use.latest.with.metadata", "mi.schema.build.number=1.1")
return configProps
}
def publishMessage(Schema schema) {
def configProps = getConfigProperties()
def producer = new KafkaProducer<String, GenericRecord>(configProps)
def topic = "sales.sales-agent-commission.reservation.actualized.sync"
def flowFile = session.get()
if (!flowFile) {
return
}
try {
session.read(flowFile, { inputStream ->
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))
def jsonSlurper = new JsonSlurper()
def jsonData = jsonSlurper.parse(reader)
// Ensure all schema fields are present in JSON data
schema.getFields().each { field ->
if (!jsonData.containsKey(field.name())) {
jsonData[field.name()] = null // Add missing fields with null values
}
}
// Convert JSON to Avro GenericRecord
def newRecord = createGenericRecord(schema, jsonData)
def randomKey = UUID.randomUUID().toString()
def sendRecord = new ProducerRecord<String, GenericRecord>(topic, randomKey, newRecord)
producer.send(sendRecord, { metadata, exception ->
if (exception) {
log.error("Kafka Publish Error: ${exception.message}")
session.transfer(flowFile, REL_FAILURE)
} else {
log.info("Message sent to ${metadata.topic()} [${metadata.partition()}] offset ${metadata.offset()}")
session.transfer(flowFile, REL_SUCCESS)
}
})
})
} catch (Exception e) {
log.error("Failed to process FlowFile", e)
session.transfer(flowFile, REL_FAILURE)
} finally {
producer.close()
}
}
Thread.currentThread().setContextClassLoader(this.class.classLoader)
log.info("Starting Kafka Producer...")
publishMessage(schema)
I have tried fixing the code added default null values in the code.