最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

azure synapse - Efficiently updating a single column value for many rows in MS Fabricpysparkdelta - Stack Overflow

programmeradmin0浏览0评论

I have a data set on pretty small Microsoft Fabric capacity (which, if you don't know is basically Azure Synapse, which is basically Apache Spark).

Due to limitations with the data source, I am basically getting a full dump every day. So I need to update a "last seen" time stamp on rows I already know (i.e. identical data), and append the changed ones.
This is not hard to do, but I am looking for the most efficient option.

Loading the new data in df_update and the existing data in df_existing, I have tried two ways of doing this:

-- 1 -- Using pyspark data frames:

I can solve the task with an outer join like

df_new = df_existing\
    .withColumnRenamed('ts', 'ts_old')\
    .join(df_update, on=all_columns_but_the_timestamp, how='outer')
return df_new\
    .withColumn('ts', coalesce(df_new['ts'], df_new['ts_old']))\
    .drop('ts_old')

Unfortunately, this requires me to replace the whole table on disk. That's slow and seems to upset OneLake a bit (seeing the updated data in a query takes additional time). Therefore I tried:

-- 2 -- Using delta lake update

By using

df_new = df_update.exceptAll(df_existing.select(all_columns_but_the_timestamp))
df_duplicates = df_ingest.exceptAll(df_new)

I can get the new and the revisited data.

for row in df_duplicates.collect():
    table.update(
            ' AND '.join([f'{k} = "{v}"' for k, v in row.asDict().items()]),
            {'ts': lit(new_timestamp).cast(TimestampType())})

is a woefully slow way to do the updates. df_new can just be appended to the table afterwards.

I have looked for

-- 3 -- Delta lake update in bulk

Somehow selecting all affected rows in one go and update the value.

table.update(
    some_very_neat_condition,
    {'ts': lit(new_timestamp).cast(TimestampType())})

Since I don't have reliable IDs, I don't know how to do that, however.

Or is there another option I'm missing?

I have a data set on pretty small Microsoft Fabric capacity (which, if you don't know is basically Azure Synapse, which is basically Apache Spark).

Due to limitations with the data source, I am basically getting a full dump every day. So I need to update a "last seen" time stamp on rows I already know (i.e. identical data), and append the changed ones.
This is not hard to do, but I am looking for the most efficient option.

Loading the new data in df_update and the existing data in df_existing, I have tried two ways of doing this:

-- 1 -- Using pyspark data frames:

I can solve the task with an outer join like

df_new = df_existing\
    .withColumnRenamed('ts', 'ts_old')\
    .join(df_update, on=all_columns_but_the_timestamp, how='outer')
return df_new\
    .withColumn('ts', coalesce(df_new['ts'], df_new['ts_old']))\
    .drop('ts_old')

Unfortunately, this requires me to replace the whole table on disk. That's slow and seems to upset OneLake a bit (seeing the updated data in a query takes additional time). Therefore I tried:

-- 2 -- Using delta lake update

By using

df_new = df_update.exceptAll(df_existing.select(all_columns_but_the_timestamp))
df_duplicates = df_ingest.exceptAll(df_new)

I can get the new and the revisited data.

for row in df_duplicates.collect():
    table.update(
            ' AND '.join([f'{k} = "{v}"' for k, v in row.asDict().items()]),
            {'ts': lit(new_timestamp).cast(TimestampType())})

is a woefully slow way to do the updates. df_new can just be appended to the table afterwards.

I have looked for

-- 3 -- Delta lake update in bulk

Somehow selecting all affected rows in one go and update the value.

table.update(
    some_very_neat_condition,
    {'ts': lit(new_timestamp).cast(TimestampType())})

Since I don't have reliable IDs, I don't know how to do that, however.

Or is there another option I'm missing?

Share Improve this question asked Mar 28 at 15:38 Jörg NeulistJörg Neulist 1435 bronze badges 2
  • 1. Is your target table a delta table? If not what kind is it? 2. Are you using Spark as your engine? 3. What % of your data do you think changes in each update, roughly? 4. "...basically Azure Synapse, which is basically Apache Spark" AFAIK this is not true. You have an option to choose Spark or "SQL pool" or "Data Explorer pool" as your compute/engine. And how you interact depends on your engine. See learn.microsoft/en-us/azure/synapse-analytics/… – Kashyap Commented Mar 28 at 17:04
  • @Kashyap 1. It's a delta table 2. Yes 3. Can be close to 100%, will decrease over time 4. You're right, that was a bit too simplistic. The idea was to point out that it's delta lake running in a spark cluster. – Jörg Neulist Commented Mar 31 at 6:42
Add a comment  | 

2 Answers 2

Reset to default 0

Q: I have a data set on pretty small Microsoft Fabric capacity (which, if you don't know is basically Azure Synapse, which is basically Apache Spark).

Due to limitations with the data source, I am basically getting a full dump every day. So I need to update a "last seen" time stamp on rows I already know (i.e. identical data), and append the changed ones.

If I understand correctly, you are trying to merge i.e insert or update

use MERGE INTO whenever possible... even traditional databases has the below sql equivalent

from delta.tables import DeltaTable
from pyspark.sql.functions import current_timestamp

 
delta_table = DeltaTable.forPath(spark, "your tablehere..")

 
delta_table.alias("existing").merge(
    df_update.alias("updates"),
    " AND ".join([f"existing.{col} = updates.{col}" for col in all_columns_but_the_timestamp])
).whenMatchedUpdate(set={
    "ts": "current_timestamp()"  
}).whenNotMatchedInsert(values={
    **{col: f"updates.{col}" for col in all_columns_but_the_timestamp},
    "ts": "current_timestamp()"  
}).execute()
  1. This is the bread and butter usecase for DeltaTable.merge() as Ram mentioned. So something like what Ram suggested in his answer. More docs here.
  2. IMO you should add a key column to your table and use that in condition param of your merge() call. Historically every time someone says there is no unique key they either don't know the data, or they haven't tried enough. In any case assuming your row is uniquely identified by a composite key of all_columns_but_the_timestamp, you could:
from pyspark.sql import functions as F

df_update = df_update.withColumn(
    'all_columns_str',
    F.concat(*all_columns_but_the_timestamp)  # in practice this would be more complex as you'll
                                              # have to convert all columns to str, handle NULLs, ...
).withColumn(
    'generated_key',
    F.conv(F.sha2('all_columns_str', 256), 16, 10)
).drop('all_columns_str')

and then:

delta_table.alias("existing").merge(
    source = df_update.alias("updates"),
    condition = 'existing.generated_key = updates.generated_key'
).whenMatchedUpdate(set={
    "ts": "current_timestamp()"  
}).whenNotMatchedInsert(values={
    **{col: f"updates.{col}" for col in all_columns_but_the_timestamp},
    "ts": "current_timestamp()"  
}).execute()
  1. Not sure how you're partitioning your table. If you add a hash as a key then partitioning would be easy. Lets say you decided that 128 is your sweet spot for number of partitions then:
df_update = df_update.withColumn('partition_id', F.col('generated_key') % 128))

and use partition_id as the partitioning column while creating your delta table.


Also none of the options listed in OP are good fit for this usecase. So do not use them.

与本文相关的文章

发布评论

评论列表(0)

  1. 暂无评论