最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

airflow - DatabricksWorkflowTaskGroup and templated task parameters for spark_jar task - Stack Overflow

programmeradmin1浏览0评论

 task_group = DatabricksWorkflowOperator(
        group_id="test",
        databricks_conn_id=dbx_conn_id,
        job_clusters=[
            {
                "job_cluster_key": "cpu_cluster",
                "new_cluster": {
                    "spark_version": "15.3.x-scala2.12",
                    "node_type_id": "cgd-fleet.xlarge",
                    "driver_node_type_id": "cgd-fleet.xlarge",
                    "num_workers": 1,
                },
            }
        ],
    )

    with task_group:
        a_task = DatabricksTaskOperator(
            task_id="jar_task",
            databricks_conn_id=dbx_conn_id,
            job_cluster_key="cpu_cluster",
            task_config={
           "spark_jar_task": {
               "main_class_name": "com.test.etl.ETLApplication",
               "parameters": [
                  "{{ var.conf.paramter1 }}",
                 ],
               },
              "libraries": lib_path,
             },
            )

I pass a templated variable {{ var.conf.paramter1 }} to my DatabricksTaskOperator and want to use a templated variable, so in the Databricks workflow, in the execution, I see the parameter as {{ var.json.table_name }}, instead of actual value, and I want to send different arguments for different tasks. I don’t want to pass those at the workflow task group level, also observed that , even i pass in workflow taskgroup also doesn’t render the templated variables.

Following are the versions used:

  1. Airflow: 2.10.4
  2. apache-airflow-providers-databricks==7.0.0

Can anyone help on this, please?


 task_group = DatabricksWorkflowOperator(
        group_id="test",
        databricks_conn_id=dbx_conn_id,
        job_clusters=[
            {
                "job_cluster_key": "cpu_cluster",
                "new_cluster": {
                    "spark_version": "15.3.x-scala2.12",
                    "node_type_id": "cgd-fleet.xlarge",
                    "driver_node_type_id": "cgd-fleet.xlarge",
                    "num_workers": 1,
                },
            }
        ],
    )

    with task_group:
        a_task = DatabricksTaskOperator(
            task_id="jar_task",
            databricks_conn_id=dbx_conn_id,
            job_cluster_key="cpu_cluster",
            task_config={
           "spark_jar_task": {
               "main_class_name": "com.test.etl.ETLApplication",
               "parameters": [
                  "{{ var.conf.paramter1 }}",
                 ],
               },
              "libraries": lib_path,
             },
            )

I pass a templated variable {{ var.conf.paramter1 }} to my DatabricksTaskOperator and want to use a templated variable, so in the Databricks workflow, in the execution, I see the parameter as {{ var.json.table_name }}, instead of actual value, and I want to send different arguments for different tasks. I don’t want to pass those at the workflow task group level, also observed that , even i pass in workflow taskgroup also doesn’t render the templated variables.

Following are the versions used:

  1. Airflow: 2.10.4
  2. apache-airflow-providers-databricks==7.0.0

Can anyone help on this, please?

Share Improve this question asked Feb 7 at 11:32 Anil ReddaboinaAnil Reddaboina 6114 silver badges6 bronze badges 4
  • give parameters properly like mentioned in databricks rest api documentation for tasks. – JayashankarGS Commented yesterday
  • Thanks, @JayshankarGS for your response. We are good with the syntax.. my question here is that I need to send as template fields and want to render them during the run-time. As per your example here, I want to access the values using the template fields concept, like below "spark_jar_task": { "main_class_name": "com.databricks.Sessionize", "parameters": [ "{{ get_paramter_valueforjob1("test-env") }}", "{{ get_data_path() }} " ] } – Anil Reddaboina Commented 22 hours ago
  • Yes. are you getting error when you tried [ "{{ get_paramter_valueforjob1("test-env") }}", "{{ get_data_path() }} " ]? – JayashankarGS Commented 21 hours ago
  • I will check from my end will update you, meanwhile you try accessing from var, like var.conf.paramter1 for both parameter name and parameter value. – JayashankarGS Commented 21 hours ago
Add a comment  | 

1 Answer 1

Reset to default 0

Since airflow uses the databricks rest api like mentioned here you need to pass the parameters according to rest api documentation.

For spark_jar_tasktask parameters you give like below.

"spark_jar_task": {
        "main_class_name": "com.databricks.Sessionize",
        "parameters": [
          "--data",
          "dbfs:/path/to/data.json"
        ]
      },

and for job level parameter give it as dictionary like below.

  "parameters": [
    {
      "default": "users",
      "name": "table"
    }

For more information refer this.

发布评论

评论列表(0)

  1. 暂无评论