I am getting intermittent IllegalArgumentException in my dynamic destination implementation, though I am always setting partionBy value but I am not sure why sometimes I am getting into this error, is my implementation of mutableMapOf is incorrect , I am using flink runner it seems to work well for sometime and soon it start throwing IllegalArgumentException error as if partitionBy is missing. I need some advice and guidance on how to debug this issue.
object BigQueryConfig {
private val logger = LoggerFactory.getLogger(BigQueryConfig::class.java)
private val destinationToSchemaMap = mutableMapOf<String, TableSchema>()
private val destinationToPartitionMap = mutableMapOf<String, String>()
private fun getClusteringFields(): List<String> =
listOf("anization_id", "account_id")
fun createDestinations(args: BigQueryArgs) =
object : DynamicDestinations<SeriesFrameTransformedData, String>() {
override fun getTable(destination: String): TableDestination {
val partitionBy = destinationToPartitionMap[destination]
?: throw IllegalArgumentException("Partitioning configuration missing for destination: $destination")
return TableDestination(
destination,
null,
TimePartitioning().setType(partitionBy).setField("timestamp"),
Clustering().setFields(getClusteringFields()),
)
}
override fun getSchema(destination: String): TableSchema =
destinationToSchemaMap[destination]
?: throw IllegalArgumentException("No schema found for destination: $destination")
override fun getDestination(element: ValueInSingleWindow<SeriesFrameTransformedData>?): String {
val tableName = element?.value?.tableName
?: throw IllegalArgumentException("Table name cannot be null or empty")
val tableSchema = element.value.tableSchema
val destination = "${args.googleProjectId}:${args.googleDatasetId}.$tableName"
if (!destinationToPartitionMap.containsKey(destination)) {
destinationToPartitionMap[destination] = element.value.partitionBy
}
if (!destinationToSchemaMap.containsKey(destination)) {
logger.info("Creating BigQuery destination: $destination")
destinationToSchemaMap[destination] = tableSchema
}
return destination
}
}
}