最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

groovy - Nifi Grrovy Script : Failed to process FlowFile: org.apache.kafka.common.errors.SerializationException: Error serializi

programmeradmin2浏览0评论

I am writing the Groovy script for the KAFKA publisher for CSFLE logic.I am getting the error

ExecuteScript[id=750b36f8-979a-17a4-8c90-cc5ec789a122] Failed to process FlowFile: org.apache.kafkamon.errors.SerializationException: Error serializing Avro message

  • Caused by: org.apache.avro.AvroRuntimeException: No field named propertyChain in: class org.apache.groovy.json.internal.LazyMap

I have handled the default NULL values as well. Still it is not working.

import org.apache.nifi.processor.io.StreamCallback
import org.apache.avro.generic.GenericRecord
import java.nio.charset.StandardCharsets
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.avro.Schema
import org.apache.avro.generic.GenericData
import groovy.json.JsonSlurper

// Load Avro schema from file
def schemaJson = new File("/etc/security/application/kafka/csfle/sales_adapter/commerce.reservation.reservations.sync_schema_dev.json").text
def schema = new Schema.Parser().parse(schemaJson)

def createGenericRecord(Schema schema, def jsonData) {
    if (jsonData == null) {
        return null
    }

    def record = new GenericData.Record(schema)

    schema.getFields().each { field ->
        def fieldName = field.name()
        def fieldSchema = field.schema()
        def fieldValue = jsonData.containsKey(fieldName) ? jsonData.get(fieldName) : null

        if (fieldValue == null) {
            record.put(fieldName, handleMissingField(fieldSchema)) // Handling missing fields
        } else {
            record.put(fieldName, handleFieldValue(fieldSchema, fieldValue)) // Handling field values
        }
    }

    return record
}

def handleMissingField(Schema fieldSchema) {
    if (isNullable(fieldSchema)) {
        return null
    } else if (fieldSchema.getType() == Schema.Type.RECORD) {
        return createEmptyRecord(fieldSchema)  // Create an empty record
    } else {
        return createDefaultValue(fieldSchema)  // Assign default values
    }
}

def createEmptyRecord(Schema recordSchema) {
    def emptyRecord = new GenericData.Record(recordSchema)

    recordSchema.getFields().each { field ->
        def fieldSchema = field.schema()
        if (isNullable(fieldSchema)) {
            emptyRecord.put(field.name(), null)
        } else {
            emptyRecord.put(field.name(), createDefaultValue(fieldSchema))
        }
    }

    return emptyRecord
}

def createDefaultValue(Schema fieldSchema) {
    switch (fieldSchema.getType()) {
        case Schema.Type.STRING:
            return ""
        case Schema.Type.INT:
            return 0
        case Schema.Type.LONG:
            return 0L
        case Schema.Type.FLOAT:
            return 0.0f
        case Schema.Type.DOUBLE:
            return 0.0d
        case Schema.Type.BOOLEAN:
            return false
        case Schema.Type.RECORD:
            return createEmptyRecord(fieldSchema)
        case Schema.Type.ARRAY:
            return new GenericData.Array<>(fieldSchema, [])
        case Schema.Type.MAP:
            return [:]
        case Schema.Type.UNION:
            return handleUnionType(fieldSchema, null)
        default:
            return null  // Assign null for unknown types
    }
}

def isNullable(Schema schema) {
    if (schema.getType() == Schema.Type.NULL) {
        return true
    }

    if (schema.getType() == Schema.Type.UNION) {
        return schema.getTypes().any { it.getType() == Schema.Type.NULL }
    }

    return false
}

def handleFieldValue(Schema fieldSchema, def value) {
    switch (fieldSchema.getType()) {
        case Schema.Type.STRING:
            return value.toString()
        case Schema.Type.INT:
            return value as Integer
        case Schema.Type.LONG:
            return value as Long
        case Schema.Type.FLOAT:
            return value as Float
        case Schema.Type.DOUBLE:
            return value as Double
        case Schema.Type.BOOLEAN:
            return value as Boolean
        case Schema.Type.ENUM:
            return new GenericData.EnumSymbol(fieldSchema, value)
        case Schema.Type.RECORD:
            return (value instanceof Map) ? createGenericRecord(fieldSchema, value) : new GenericData.Record(fieldSchema)
        case Schema.Type.ARRAY:
            return new GenericData.Array<>(fieldSchema, value as List)
        case Schema.Type.MAP:
            return new HashMap<>(value as Map)
        case Schema.Type.UNION:
            return handleUnionType(fieldSchema, value)
        default:
            throw new IllegalArgumentException("Unsupported field type: ${fieldSchema.getType()}")
    }
}

def handleUnionType(Schema fieldSchema, def value) {
    for (Schema typeSchema : fieldSchema.getTypes()) {
        if (typeSchema.getType() == Schema.Type.NULL && value == null) {
            return null
        }
        if (typeSchema.getType() == Schema.Type.RECORD && value instanceof Map) {
            return createGenericRecord(typeSchema, value)
        }
        if (typeSchema.getType() == Schema.Type.STRING && value instanceof String) {
            return value
        }
        if (typeSchema.getType() == Schema.Type.INT && value instanceof Integer) {
            return value
        }
        if (typeSchema.getType() == Schema.Type.LONG && value instanceof Long) {
            return value
        }
        if (typeSchema.getType() == Schema.Type.BOOLEAN && value instanceof Boolean) {
            return value
        }
        if (typeSchema.getType() == Schema.Type.FLOAT && value instanceof Float) {
            return value
        }
        if (typeSchema.getType() == Schema.Type.DOUBLE && value instanceof Double) {
            return value
        }
        if (typeSchema.getType() == Schema.Type.ARRAY && value instanceof List) {
            return new GenericData.Array<>(typeSchema, value as List)
        }
        if (typeSchema.getType() == Schema.Type.MAP && value instanceof Map) {
            return new HashMap<>(value as Map)
        }
        if (typeSchema.getType() == Schema.Type.ENUM) {
            return new GenericData.EnumSymbol(typeSchema, value)
        }
    }
    throw new IllegalArgumentException("No matching type found in union for value: $value")
}

def getConfigProperties() {
    def configProps = new Properties()

    configProps.put("bootstrap.servers", context.getProperty("bootstrap.servers").getValue())
    configProps.put("client.id", "nifi-csfle-playground-producer-client1")
    configProps.put("key.serializer", "org.apache.kafkamon.serialization.StringSerializer")
    configProps.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")
    configProps.put("security.protocol", "SASL_SSL")
    configProps.put("sasl.jaas.config", "org.apache.kafkamon.security.plain.PlainLoginModule required username='"
        + context.getProperty("sasl_jaas.username").getValue() + "' password='"
        + context.getProperty("sasl_jaas.password").getValue() + "';")
    configProps.put("sasl.mechanism", "PLAIN")
    configProps.put("schema.registry.url", context.getProperty("schema.registry.url").getValue())
    configProps.put("basic.auth.credentials.source", "USER_INFO")
    configProps.put("basic.auth.user.info", context.getProperty("schema_registry.username").getValue() + ":"
        + context.getProperty("schema_registry.password").getValue())
    configProps.put("auto.register.schemas", "false")
    configProps.put("use.latest.version", "false")
    configProps.put("use.latest.with.metadata", "mi.schema.build.number=1.1")

    return configProps
}

def publishMessage(Schema schema) {
    def configProps = getConfigProperties()
    def producer = new KafkaProducer<String, GenericRecord>(configProps)
    def topic = "sales.sales-agent-commission.reservation.actualized.sync"

    def flowFile = session.get()
    if (!flowFile) {
        return
    }

    try {
        session.read(flowFile, { inputStream ->
            BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))
            def jsonSlurper = new JsonSlurper()
            def jsonData = jsonSlurper.parse(reader)

            // Ensure all schema fields are present in JSON data
            schema.getFields().each { field ->
                if (!jsonData.containsKey(field.name())) {
                    jsonData[field.name()] = null // Add missing fields with null values
                }
            }

            // Convert JSON to Avro GenericRecord
            def newRecord = createGenericRecord(schema, jsonData)
            def randomKey = UUID.randomUUID().toString()
            def sendRecord = new ProducerRecord<String, GenericRecord>(topic, randomKey, newRecord)

            producer.send(sendRecord, { metadata, exception ->
                if (exception) {
                    log.error("Kafka Publish Error: ${exception.message}")
                    session.transfer(flowFile, REL_FAILURE)
                } else {
                    log.info("Message sent to ${metadata.topic()} [${metadata.partition()}] offset ${metadata.offset()}")
                    session.transfer(flowFile, REL_SUCCESS)
                }
            })
        })
    } catch (Exception e) {
        log.error("Failed to process FlowFile", e)
        session.transfer(flowFile, REL_FAILURE)
    } finally {
        producer.close()
    }
}
Thread.currentThread().setContextClassLoader(this.class.classLoader)
log.info("Starting Kafka Producer...")
publishMessage(schema)

I have tried fixing the code added default null values in the code.

与本文相关的文章

发布评论

评论列表(0)

  1. 暂无评论