I am facing an issue with an Azure Data Factory pipeline that fails with the following error:
Error code: DFExecutorUserError
Failure type: User configuration issue
Details: Job failed due to reason: com.microsoft.dataflow.Issues: DF-DSL-002 - Parameter stream has parsing errors Parameter(s) with errors: query. Not honoring the datatype of parameter(s) could be one of the causes.
Activity ID: 19fdb4db-846a-4920-8cdb-3292e872fdba
The concerned pipeline consists of the following activities:
Get Metadata and Last Incremental Time: A lookup activity that retrieves metadata and incremental timestamps from an Azure PostgreSQL database. This metadata includes a query that needs to be executed in the next step using the timestamp that is fetched along with it to extract the incremental data.
Data Flow: A Data Flow activity that processes the data. This is the ETL part of the pipeline.
Update Incremental Timestamp: A lookup activity that updates the last processed timestamp.
The Get Metadata and Last Incremental Time activity runs the following query:
SELECT
to_char(t.incremental_timestamp, 'YYYY-MM-DDThh:mm:ssZ') AS incremental_timestamp,
m.source_entity AS source_object,
m.query
FROM
adf_control.interface_incremantal_timestamps t
INNER JOIN adf_control.interface_metadata m
ON t.interface_id = m.interface_id
WHERE
m.interface_id = '@{pipeline().parameters.interfaceId}';
Note: interfaceId
is a pipeline parameter.
The Data Flow dynamically takes parameters from the step #1:
"parameters": {
"query": {
"value": "'@{activity('Get Metadata and Last Incremental Time').output.firstRow.query}'",
"type": "Expression"
},
"incrementalTimestamp": {
"value": "'@{activity('Get Metadata and Last Incremental Time').output.firstRow.incremental_timestamp}'",
"type": "Expression"
}
}
The data flow queries an Azure PostgreSQL table and processes the records before sending them to an API sink.
The query parameter in the source transformation is defined as:
$query + " WHERE CAST(p.\"lastUpdateDate\" AS BIGINT) > EXTRACT(EPOCH FROM timestamptz '" + $incrementalTimestamp + "') * 1000" + " LIMIT 5 OFFSET 0"
Note: The limit will be removed once this pipeline becomes testable.
Troubleshooting Steps Taken:
Verified that
activity('Get Metadata and Last Incremental Time').output.firstRow.query
returns a valid SQL string.Checked that
incremental_timestamp
is correctly formatted.Ensured that the
query
parameter is enclosed in proper quotes.Ran the query manually on PostgreSQL without issues.
Hard coded the query in both the data flow and the pipeline and the query ran without any issues in both places
Questions:
Could the issue be due to improper data type handling when passing the query as a parameter?
Is there a recommended way to pass dynamic SQL queries as parameters in Azure Data Factory?
Are there any restrictions on dynamic SQL query usage in Data Flows?
Any guidance would be greatly appreciated!