I'm trying to implement the Pyspark code below to read delta files saved in the data lake (delta_table) and join with data frame with updated records (novos_registros).
#5. Build the matching condition for the MERGE
condicao_correspondencia = " AND ".join([f"t.{col} = s.{col}" for col in chave_unica])
# 6. Use the MERGE command to update or insert data
delta_table = DeltaTable.forPath(spark, delta_path)
delta_table.alias("t").merge(
novos_registros.alias("s"),
condicao_correspondencia # Dynamically generated match condition
).whenMatchedUpdate(set={
**{col: f"s.{col}" for col in novos_registros.columns} # Update all fields of novos_registros
}).whenNotMatchedInsert(values={
**{col: f"s.{col}" for col in novos_registros.columns} # Insere todos os campos de novos_registros
}).execute()
However, I'm having problems with field incompatibility. It says that a certain field in delta_lake is in complex format, and the same is string format in the data frame. This is the error that I get.
AnalysisException: [INVALID_EXTRACT_BASE_FIELD_TYPE] Can't extract a value from "0NET_PRICE". Need a complex type [STRUCT, ARRAY, MAP] but got "STRING".
Apparently, DeltaTable.forPath(spark, delta_path) is inferring the format of the fields, while the data frame fields are all in string format. This is a generic script and I don't want to fix the fields formats into the code. It should be dynamic.
Thanks in advance.
I'm trying to implement the Pyspark code below to read delta files saved in the data lake (delta_table) and join with data frame with updated records (novos_registros).
#5. Build the matching condition for the MERGE
condicao_correspondencia = " AND ".join([f"t.{col} = s.{col}" for col in chave_unica])
# 6. Use the MERGE command to update or insert data
delta_table = DeltaTable.forPath(spark, delta_path)
delta_table.alias("t").merge(
novos_registros.alias("s"),
condicao_correspondencia # Dynamically generated match condition
).whenMatchedUpdate(set={
**{col: f"s.{col}" for col in novos_registros.columns} # Update all fields of novos_registros
}).whenNotMatchedInsert(values={
**{col: f"s.{col}" for col in novos_registros.columns} # Insere todos os campos de novos_registros
}).execute()
However, I'm having problems with field incompatibility. It says that a certain field in delta_lake is in complex format, and the same is string format in the data frame. This is the error that I get.
AnalysisException: [INVALID_EXTRACT_BASE_FIELD_TYPE] Can't extract a value from "0NET_PRICE". Need a complex type [STRUCT, ARRAY, MAP] but got "STRING".
Apparently, DeltaTable.forPath(spark, delta_path) is inferring the format of the fields, while the data frame fields are all in string format. This is a generic script and I don't want to fix the fields formats into the code. It should be dynamic.
Thanks in advance.
Share Improve this question asked Mar 19 at 13:37 Marcelo HerdyMarcelo Herdy 211 bronze badge 2- error occurs because Delta Lake infers the schema of the stored data, your novos_registros DataFrame contains all string fields. – Dileep Raj Narayan Thumula Commented Mar 20 at 3:29
- Thanks @DileepRajNarayanThumula. Do you know any work around that can help me with this situation? – Marcelo Herdy Commented Mar 20 at 14:18
1 Answer
Reset to default 0For anyone who encounters this in the future, here's how we solved it.
The problem occurred because we have a field called 0PRICE.CURRENCY
and the command was misinterpreting it.
We added a ` before and after the field name and that solved it.
# Carregar a tabela Delta
delta_table = DeltaTable.forPath(spark, abfss_path_target)
# Construir a condição de correspondência
condicao_correspondencia = " AND ".join([f"t.{chave} = s.{chave}" for chave in chave_unica])
# Usar o comando MERGE para atualizar ou inserir dados
delta_table.alias("t").merge(
novos_registros.alias("s"),
condicao_correspondencia # Usar a condição de correspondência construída
).whenMatchedUpdate(set={f"`{col}`": f"s.`{col}`" for col in lista_campos}
).whenNotMatchedInsert(values={f"`{col}`": f"s.`{col}`" for col in lista_campos}
).execute()