All of task failures result in a slack notification being sent by way of on_failure_notifiy
.
I would like to include the error message that Airflow reports. I tried using context['exception'] but that was blank. I searched, but I can't find anything online.
Has anyone figured out how to send Airflow error logs to a SlackAPIPostOperator?
Edit: My code is the following:
def task_failure_alert(context):
env = Variable.get("Environment", default_var="Sandbox")
failed_alert = SlackAPIPostOperator(
task_id="DAG_failed",
channel="#data-alerts",
slack_conn_id="slack",
text=f":red_circle: <!subteam^::redacted::> Task {context['task_instance_key_str']} failed for {context['dag']}. To learn more, click here: {context['task_instance'].log_url}",
)
if env == "Production":
return failed_alert.execute(context)
else:
print(f"Task has failed, task_instance_key_str: {context['task_instance_key_str']}")
I'd like to be able to add something like context['exception']
to help log the error as well, but that doesn't seem to be working.
Edit2:
The DAG itself is defined as
@dag(
start_date=datetime(2024, 11, 15),
schedule="0 11 * * *",
catchup=False,
concurrency=3,
max_active_runs=3,
default_args={
"on_success_callback": task_success_alert,
"on_failure_callback": task_failure_alert,
},
)