I have a table in Snowflake that tracks newly added items, and a downstream data processing workflow that needs to be triggered whenever new items are added. I'm currently using Lakehouse Federation to query the Snowflake tables in Databricks.
How can I set up a mechanism to trigger the downstream data processing step with the newly added items? For example, if table X in Snowflake receives a new insert with item_id = 84848, the workflow should be triggered to run analysis based on this item_id. The trigger can be either interval-based or event-driven.
What would be the best approach to implement this in databricks?
I have a table in Snowflake that tracks newly added items, and a downstream data processing workflow that needs to be triggered whenever new items are added. I'm currently using Lakehouse Federation to query the Snowflake tables in Databricks.
How can I set up a mechanism to trigger the downstream data processing step with the newly added items? For example, if table X in Snowflake receives a new insert with item_id = 84848, the workflow should be triggered to run analysis based on this item_id. The trigger can be either interval-based or event-driven.
What would be the best approach to implement this in databricks?
Share Improve this question edited Mar 29 at 17:28 user19192927 asked Mar 29 at 17:25 user19192927user19192927 353 bronze badges1 Answer
Reset to default 0Will you be able to come up with a SQL query that returns only the new rows/items added? If so You can refer this doc to read data from a Databricks spark job using a query.
https://docs.snowflake/en/user-guide/spark-connector-use#using-the-connector-in-scala
val df = sparkSession.read.format(SNOWFLAKE_SOURCE_NAME)
.options(sfOptions)
.option("query", query)
.option("autopushdown", "off")
.load()
val df: DataFrame = sqlContext.read
.format(SNOWFLAKE_SOURCE_NAME)
.options(sfOptions)
.option("query", "SELECT DEPT, SUM(SALARY) AS SUM_SALARY FROM T1")
.load()