最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

postgresql - Azure Data Factory Data Flow Error: DF-DSL-002 - Parameter stream has parsing errors - Stack Overflow

programmeradmin5浏览0评论

I am facing an issue with an Azure Data Factory pipeline that fails with the following error:

Error code: DFExecutorUserError
Failure type: User configuration issue
Details: Job failed due to reason: com.microsoft.dataflow.Issues: DF-DSL-002 - Parameter stream has parsing errors Parameter(s) with errors: query. Not honoring the datatype of parameter(s) could be one of the causes.
Activity ID: 19fdb4db-846a-4920-8cdb-3292e872fdba

The concerned pipeline consists of the following activities:

  1. Get Metadata and Last Incremental Time: A lookup activity that retrieves metadata and incremental timestamps from an Azure PostgreSQL database. This metadata includes a query that needs to be executed in the next step using the timestamp that is fetched along with it to extract the incremental data.

  2. Data Flow: A Data Flow activity that processes the data. This is the ETL part of the pipeline.

  3. Update Incremental Timestamp: A lookup activity that updates the last processed timestamp.

The Get Metadata and Last Incremental Time activity runs the following query:

SELECT
    to_char(t.incremental_timestamp, 'YYYY-MM-DDThh:mm:ssZ') AS incremental_timestamp,
    m.source_entity AS source_object,
    m.query
FROM
    adf_control.interface_incremantal_timestamps t
    INNER JOIN adf_control.interface_metadata m
        ON t.interface_id = m.interface_id
WHERE
    m.interface_id = '@{pipeline().parameters.interfaceId}';

Note: interfaceId is a pipeline parameter.

The Data Flow dynamically takes parameters from the step #1:

"parameters": {
    "query": {
        "value": "'@{activity('Get Metadata and Last Incremental Time').output.firstRow.query}'",
        "type": "Expression"
    },
    "incrementalTimestamp": {
        "value": "'@{activity('Get Metadata and Last Incremental Time').output.firstRow.incremental_timestamp}'",
        "type": "Expression"
    }
}

The data flow queries an Azure PostgreSQL table and processes the records before sending them to an API sink.

The query parameter in the source transformation is defined as:

$query + " WHERE CAST(p.\"lastUpdateDate\" AS BIGINT) > EXTRACT(EPOCH FROM timestamptz '" + $incrementalTimestamp + "') * 1000" + " LIMIT 5 OFFSET 0"

Note: The limit will be removed once this pipeline becomes testable.

Troubleshooting Steps Taken:

  • Verified that activity('Get Metadata and Last Incremental Time').output.firstRow.query returns a valid SQL string.

  • Checked that incremental_timestamp is correctly formatted.

  • Ensured that the query parameter is enclosed in proper quotes.

  • Ran the query manually on PostgreSQL without issues.

  • Hard coded the query in both the data flow and the pipeline and the query ran without any issues in both places

Questions:

  • Could the issue be due to improper data type handling when passing the query as a parameter?

  • Is there a recommended way to pass dynamic SQL queries as parameters in Azure Data Factory?

  • Are there any restrictions on dynamic SQL query usage in Data Flows?

Any guidance would be greatly appreciated!

发布评论

评论列表(0)

  1. 暂无评论