最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

Using DynamicDestination in Apache Beam gives intermittent error with pass down values - Stack Overflow

programmeradmin4浏览0评论

I am getting intermittent IllegalArgumentException in my dynamic destination implementation, though I am always setting partionBy value but I am not sure why sometimes I am getting into this error, is my implementation of mutableMapOf is incorrect , I am using flink runner it seems to work well for sometime and soon it start throwing IllegalArgumentException error as if partitionBy is missing. I need some advice and guidance on how to debug this issue.

object BigQueryConfig {
    private val logger = LoggerFactory.getLogger(BigQueryConfig::class.java)

    private val destinationToSchemaMap = mutableMapOf<String, TableSchema>()
    private val destinationToPartitionMap = mutableMapOf<String, String>()

    private fun getClusteringFields(): List<String> =
        listOf("anization_id", "account_id")

    fun createDestinations(args: BigQueryArgs) =
        object : DynamicDestinations<SeriesFrameTransformedData, String>() {
            override fun getTable(destination: String): TableDestination {
                val partitionBy = destinationToPartitionMap[destination]
                    ?: throw IllegalArgumentException("Partitioning configuration missing for destination: $destination")

                return TableDestination(
                    destination,
                    null,
                    TimePartitioning().setType(partitionBy).setField("timestamp"),
                    Clustering().setFields(getClusteringFields()),
                )
            }

            override fun getSchema(destination: String): TableSchema =
                destinationToSchemaMap[destination]
                    ?: throw IllegalArgumentException("No schema found for destination: $destination")

            override fun getDestination(element: ValueInSingleWindow<SeriesFrameTransformedData>?): String {
                val tableName = element?.value?.tableName
                    ?: throw IllegalArgumentException("Table name cannot be null or empty")
                val tableSchema = element.value.tableSchema
                val destination = "${args.googleProjectId}:${args.googleDatasetId}.$tableName"

                if (!destinationToPartitionMap.containsKey(destination)) {
                    destinationToPartitionMap[destination] = element.value.partitionBy
                }
                if (!destinationToSchemaMap.containsKey(destination)) {
                    logger.info("Creating BigQuery destination: $destination")
                    destinationToSchemaMap[destination] = tableSchema
                }

                return destination
            }
        }
}
发布评论

评论列表(0)

  1. 暂无评论