最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

slack - Is there a way to send Airflow error logs to a SlackAPIPostOperator? - Stack Overflow

programmeradmin1浏览0评论

All of task failures result in a slack notification being sent by way of on_failure_notifiy.

I would like to include the error message that Airflow reports. I tried using context['exception'] but that was blank. I searched, but I can't find anything online.

Has anyone figured out how to send Airflow error logs to a SlackAPIPostOperator?

Edit: My code is the following:

def task_failure_alert(context):
    env = Variable.get("Environment", default_var="Sandbox")
    failed_alert = SlackAPIPostOperator(
        task_id="DAG_failed",
        channel="#data-alerts",
        slack_conn_id="slack",
        text=f":red_circle: <!subteam^::redacted::> Task {context['task_instance_key_str']} failed for {context['dag']}. To learn more, click here: {context['task_instance'].log_url}",
    )
    if env == "Production":
        return failed_alert.execute(context)
    else:
        print(f"Task has failed, task_instance_key_str: {context['task_instance_key_str']}")

I'd like to be able to add something like context['exception'] to help log the error as well, but that doesn't seem to be working.

Edit2:

The DAG itself is defined as

@dag(
    start_date=datetime(2024, 11, 15),
    schedule="0 11 * * *",
    catchup=False,
    concurrency=3,
    max_active_runs=3,
    default_args={
        "on_success_callback": task_success_alert,
        "on_failure_callback": task_failure_alert,
    },
)
发布评论

评论列表(0)

  1. 暂无评论