I have a data set on pretty small Microsoft Fabric capacity (which, if you don't know is basically Azure Synapse, which is basically Apache Spark).
Due to limitations with the data source, I am basically getting a full dump every day. So I need to update a "last seen" time stamp on rows I already know (i.e. identical data), and append the changed ones.
This is not hard to do, but I am looking for the most efficient option.
Loading the new data in df_update
and the existing data in df_existing
, I have tried two ways of doing this:
-- 1 -- Using pyspark data frames:
I can solve the task with an outer join like
df_new = df_existing\
.withColumnRenamed('ts', 'ts_old')\
.join(df_update, on=all_columns_but_the_timestamp, how='outer')
return df_new\
.withColumn('ts', coalesce(df_new['ts'], df_new['ts_old']))\
.drop('ts_old')
Unfortunately, this requires me to replace the whole table on disk. That's slow and seems to upset OneLake a bit (seeing the updated data in a query takes additional time). Therefore I tried:
-- 2 -- Using delta lake update
By using
df_new = df_update.exceptAll(df_existing.select(all_columns_but_the_timestamp))
df_duplicates = df_ingest.exceptAll(df_new)
I can get the new and the revisited data.
for row in df_duplicates.collect():
table.update(
' AND '.join([f'{k} = "{v}"' for k, v in row.asDict().items()]),
{'ts': lit(new_timestamp).cast(TimestampType())})
is a woefully slow way to do the updates. df_new
can just be appended to the table afterwards.
I have looked for
-- 3 -- Delta lake update in bulk
Somehow selecting all affected rows in one go and update the value.
table.update(
some_very_neat_condition,
{'ts': lit(new_timestamp).cast(TimestampType())})
Since I don't have reliable IDs, I don't know how to do that, however.
Or is there another option I'm missing?
I have a data set on pretty small Microsoft Fabric capacity (which, if you don't know is basically Azure Synapse, which is basically Apache Spark).
Due to limitations with the data source, I am basically getting a full dump every day. So I need to update a "last seen" time stamp on rows I already know (i.e. identical data), and append the changed ones.
This is not hard to do, but I am looking for the most efficient option.
Loading the new data in df_update
and the existing data in df_existing
, I have tried two ways of doing this:
-- 1 -- Using pyspark data frames:
I can solve the task with an outer join like
df_new = df_existing\
.withColumnRenamed('ts', 'ts_old')\
.join(df_update, on=all_columns_but_the_timestamp, how='outer')
return df_new\
.withColumn('ts', coalesce(df_new['ts'], df_new['ts_old']))\
.drop('ts_old')
Unfortunately, this requires me to replace the whole table on disk. That's slow and seems to upset OneLake a bit (seeing the updated data in a query takes additional time). Therefore I tried:
-- 2 -- Using delta lake update
By using
df_new = df_update.exceptAll(df_existing.select(all_columns_but_the_timestamp))
df_duplicates = df_ingest.exceptAll(df_new)
I can get the new and the revisited data.
for row in df_duplicates.collect():
table.update(
' AND '.join([f'{k} = "{v}"' for k, v in row.asDict().items()]),
{'ts': lit(new_timestamp).cast(TimestampType())})
is a woefully slow way to do the updates. df_new
can just be appended to the table afterwards.
I have looked for
-- 3 -- Delta lake update in bulk
Somehow selecting all affected rows in one go and update the value.
table.update(
some_very_neat_condition,
{'ts': lit(new_timestamp).cast(TimestampType())})
Since I don't have reliable IDs, I don't know how to do that, however.
Or is there another option I'm missing?
Share Improve this question asked Mar 28 at 15:38 Jörg NeulistJörg Neulist 1435 bronze badges 2 |2 Answers
Reset to default 0Q: I have a data set on pretty small Microsoft Fabric capacity (which, if you don't know is basically Azure Synapse, which is basically Apache Spark).
Due to limitations with the data source, I am basically getting a full dump every day. So I need to update a "last seen" time stamp on rows I already know (i.e. identical data), and append the changed ones.
If I understand correctly, you are trying to merge i.e insert or update
use MERGE INTO whenever possible... even traditional databases has the below sql equivalent
from delta.tables import DeltaTable
from pyspark.sql.functions import current_timestamp
delta_table = DeltaTable.forPath(spark, "your tablehere..")
delta_table.alias("existing").merge(
df_update.alias("updates"),
" AND ".join([f"existing.{col} = updates.{col}" for col in all_columns_but_the_timestamp])
).whenMatchedUpdate(set={
"ts": "current_timestamp()"
}).whenNotMatchedInsert(values={
**{col: f"updates.{col}" for col in all_columns_but_the_timestamp},
"ts": "current_timestamp()"
}).execute()
- This is the bread and butter usecase for
DeltaTable.merge()
as Ram mentioned. So something like what Ram suggested in his answer. More docs here. - IMO you should add a key column to your table and use that in
condition
param of yourmerge()
call. Historically every time someone says there is no unique key they either don't know the data, or they haven't tried enough. In any case assuming your row is uniquely identified by a composite key ofall_columns_but_the_timestamp
, you could:
from pyspark.sql import functions as F
df_update = df_update.withColumn(
'all_columns_str',
F.concat(*all_columns_but_the_timestamp) # in practice this would be more complex as you'll
# have to convert all columns to str, handle NULLs, ...
).withColumn(
'generated_key',
F.conv(F.sha2('all_columns_str', 256), 16, 10)
).drop('all_columns_str')
and then:
delta_table.alias("existing").merge(
source = df_update.alias("updates"),
condition = 'existing.generated_key = updates.generated_key'
).whenMatchedUpdate(set={
"ts": "current_timestamp()"
}).whenNotMatchedInsert(values={
**{col: f"updates.{col}" for col in all_columns_but_the_timestamp},
"ts": "current_timestamp()"
}).execute()
- Not sure how you're partitioning your table. If you add a hash as a key then partitioning would be easy. Lets say you decided that 128 is your sweet spot for number of partitions then:
df_update = df_update.withColumn('partition_id', F.col('generated_key') % 128))
and use partition_id
as the partitioning column while creating your delta table.
Also none of the options listed in OP are good fit for this usecase. So do not use them.
"...basically Azure Synapse, which is basically Apache Spark"
AFAIK this is not true. You have an option to choose Spark or "SQL pool" or "Data Explorer pool" as your compute/engine. And how you interact depends on your engine. See learn.microsoft/en-us/azure/synapse-analytics/… – Kashyap Commented Mar 28 at 17:04