最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

Why does databricks autoloader crash after error and how can I fix it? - Stack Overflow

programmeradmin3浏览0评论

We are using databricks autoloader to process parquet files into delta format. The job is scheduled to run once per day and the code looks like this:

def run_autoloader(table_name, checkpoint_path, latest_file_location, new_columns):
# Configure Auto Loader to ingest parquet data to a Delta table
  (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "parquet")
    .option("cloudFiles.schemaLocation", checkpoint_path)
    .option("cloudFiles.schemaEvolutionMode", "addNewColumns")
    .load(latest_file_location)
    .toDF(*new_columns)
    .select("*", spark_col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"),current_date().alias("processing_date"))
    .writeStream
    .option("checkpointLocation", checkpoint_path)
    .option("mergeSchema", "true")
    .trigger(once=True)
    .toTable(table_name))

On some occasions the parquet files have duplicate columns and the pipe line crashes as expected. After fixing the error in the source data, however, the pipeline crashes with a mysterious error:

ERROR: Some streams terminated before this command could finish!

The stack trace suggests there has been an error while authenticating with s3, but I know that's not the case because I can read the data from a standalone notebook just fine. After digging around several forums, the only solution I found was to delete the autoloader checkpoint for the entire pipeline and start over. In our case this works because every day's data is a complete snapshot of all the data until that day, so there's no data loss in the end, but we can't keep resorting to this in the future, especially not when we have other pipelines that load an incremental delta to a much bigger dataset.

Has anyone come across this error before? If so, how can I work around it? It seems to happen only after a crash caused by some other error.

Any help would be much appreciated.

发布评论

评论列表(0)

  1. 暂无评论