最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

Triggering Downstream Workflow in Databricks from New Inserts in Snowflake - Stack Overflow

programmeradmin1浏览0评论

I have a table in Snowflake that tracks newly added items, and a downstream data processing workflow that needs to be triggered whenever new items are added. I'm currently using Lakehouse Federation to query the Snowflake tables in Databricks.

How can I set up a mechanism to trigger the downstream data processing step with the newly added items? For example, if table X in Snowflake receives a new insert with item_id = 84848, the workflow should be triggered to run analysis based on this item_id. The trigger can be either interval-based or event-driven.

What would be the best approach to implement this in databricks?

I have a table in Snowflake that tracks newly added items, and a downstream data processing workflow that needs to be triggered whenever new items are added. I'm currently using Lakehouse Federation to query the Snowflake tables in Databricks.

How can I set up a mechanism to trigger the downstream data processing step with the newly added items? For example, if table X in Snowflake receives a new insert with item_id = 84848, the workflow should be triggered to run analysis based on this item_id. The trigger can be either interval-based or event-driven.

What would be the best approach to implement this in databricks?

Share Improve this question edited Mar 29 at 17:28 user19192927 asked Mar 29 at 17:25 user19192927user19192927 353 bronze badges
Add a comment  | 

1 Answer 1

Reset to default 0

Will you be able to come up with a SQL query that returns only the new rows/items added? If so You can refer this doc to read data from a Databricks spark job using a query.
https://docs.snowflake/en/user-guide/spark-connector-use#using-the-connector-in-scala

val df = sparkSession.read.format(SNOWFLAKE_SOURCE_NAME)
  .options(sfOptions)
  .option("query", query)
  .option("autopushdown", "off")
  .load()
val df: DataFrame = sqlContext.read
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfOptions)
    .option("query", "SELECT DEPT, SUM(SALARY) AS SUM_SALARY FROM T1")
    .load()
发布评论

评论列表(0)

  1. 暂无评论