最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

Synapse Parquet Upsert - Stack Overflow

programmeradmin1浏览0评论

I'm pulling data from PostgreSQL to Azure Gen2 using Copy Activity. I'm performing the load incrementally based on the timestamp column. However, this approach is causing duplicate IDs. I'm trying to achieve something like an upsert, but Copy Activity doesn't support it, and since the data is in Parquet format, I can't perform any deletes either

I'm pulling data from PostgreSQL to Azure Gen2 using Copy Activity. I'm performing the load incrementally based on the timestamp column. However, this approach is causing duplicate IDs. I'm trying to achieve something like an upsert, but Copy Activity doesn't support it, and since the data is in Parquet format, I can't perform any deletes either

Share Improve this question asked Mar 5 at 20:24 MR35MR35 111 bronze badge 2
  • you can convert your target data to Delta Lake and perform an upsert (MERGE INTO) operation in Synapse – Dileep Raj Narayan Thumula Commented Mar 6 at 3:36
  • @DileepRajNarayanThumula, Thanks for your answer! I am using Gen2 for target dataset and only Avro, Binary, DelimitedText, Icerberg,Json, Orc and Parquet formats are available. I wanted to upsert using only 1 copy activity – MR35 Commented Mar 6 at 16:25
Add a comment  | 

1 Answer 1

Reset to default 0

Azure Data Factory Copy Activity does not support upsert operations directly: As a work around you can use Copy Activity to load data incrementally from PostgreSQL into a staging folder in Gen2.

df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("abfss://[email protected]/new/customer_orders.csv")
delta_table_path = "abfss://[email protected]/delta/processed_orders/"
delta_table = DeltaTable.forPath(spark, delta_table_path)
delta_table.alias("tgt").merge(
    df_new.alias("src"),
    "tgt.id = src.id"
).whenMatchedUpdate(set={
    "customer_name": "src.customer_name",
    "order_amount": "src.order_amount",
    "order_date": "src.order_date",
    "last_updated": "src.last_updated"
}).whenNotMatchedInsert(values={
    "id": "src.id",
    "customer_name": "src.customer_name",
    "order_amount": "src.order_amount",
    "order_date": "src.order_date",
    "last_updated": "src.last_updated"
}).execute()
df_processed = spark.read.format("delta").load(delta_table_path)
df_processed.show()

In the above code reading the File from ADLS in staging: Converting to DeltaTable format Perform Upsert (Merge Into)

Results:

+---+-------------+------------+-------------------+-------------------+
| id|customer_name|order_amount|         order_date|       last_updated|
+---+-------------+------------+-------------------+-------------------+
|  1|        Alice|       100.5|2024-03-10 10:00:00|2024-03-10 10:15:00|
|  2|          Bob|      250.75|2024-03-11 14:30:00|2024-03-11 14:45:00|
|  3|      Charlie|       180.0|2024-03-12 08:20:00|2024-03-12 08:35:00|
|  4|        Alice|       300.2|2024-03-13 15:45:00|2024-03-13 15:55:00|
|  5|          Eve|       150.0|2024-03-14 18:10:00|2024-03-14 18:20:00|
+---+-------------+------------+-------------------+-------------------+
发布评论

评论列表(0)

  1. 暂无评论