I have below sample Python DAG script for Apache Airflow. It is meant for producing messages to two Kafka topics. The Kafka credentials are kept in HC Vault and every time the script runs, it is supposed to fetch latest credential set from Vault. However, it is not happening - here, db.merge_conn() does not update the conn_id kafka_connect_2 with latest Kafka config (say, if there is any update in username, password etc.).
from __future__ import annotations
import json
import logging
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.apache.kafka.operators.produce import ProduceToTopicOperator
from airflow.operators.python import PythonOperator
from airflow.models import Connection
from airflow.utils import db
from functools import partial
import hvac
from hvac.api.auth_methods import AppRole
# Setting up logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
# Default arguments
default_args = {
"owner": "airflow",
"depend_on_past": False,
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
# Vault connection constants
ROLE_ID = "<role_id>"
SECRET_ID = "<secret_id>"
VAULT_ADDRESS = ";
NAMESPACE = "<namespace>"
SECRET_PATH = "<secret_path>"
MOUNT_POINT = "<mount_point>"
# Constants for task_id and topic names
TOPIC_CSG_INSIGHT_SA_NOTIFY_EVENT_DEV = "TOPIC_SA_NOTIFY_EVENT"
TOPIC_CSG_INSIGHT_MATERIALIZED_VIEW_REFRESH_DEV = "TOPIC_MAT_VIEW_REFRESH"
TASK_ID_CSG_INSIGHT_SA_NOTIFY_EVENT_DEV = "produce_to_csg_insight_sa_notify_event_dev"
TASK_ID_CSG_INSIGHT_MATERIALIZED_VIEW_REFRESH_DEV = "produce_to_csg_insight_materialized_view_refresh_dev"
def get_vault_secret():
"""Retrieve Kafka configuration from Vault."""
try:
client = hvac.Client(url=VAULT_ADDRESS, verify=False, namespace=NAMESPACE)
approle = AppRole(client.adapter)
auth_response = approle.login(role_id=ROLE_ID, secret_id=SECRET_ID, mount_point="approle")
client.token = auth_response["auth"]["client_token"]
if not client.is_authenticated():
logger.error("Failed to authenticate with Vault.")
return None
logger.info("Successfully authenticated with Vault.")
response = client.secrets.kv.v2.read_secret(path=SECRET_PATH, mount_point=MOUNT_POINT)
if "data" in response:
return response["data"]["data"]
else:
logger.error("Failed to retrieve secret from Vault.")
return None
except Exception as e:
logger.error(f"Error while retrieving secrets from Vault: {e}")
return None
def load_connections():
"""Load or update the Kafka connection in Airflow."""
# Fetch the latest Kafka configurations from Vault
config_data = get_vault_secret()
if not config_data:
raise ValueError("Failed to fetch configuration from Vault.")
# Update the Kafka connection in Airflow
db.merge_conn(
Connection(
conn_id="kafka_connect_2",
conn_type="kafka",
extra=json.dumps({
"bootstrap.servers": config_data["bootstrap.servers.config"],
"security.protocol": config_data["kafka.security.protocol"],
"sasl.mechanism": config_data["sasl.mechanism"],
"sasl.username": config_data["sasl.username"],
"sasl.password": config_data["sasl.password"],
"acks": "all"
}),
)
)
logger.info("Airflow Kafka connection updated successfully.")
def producer_function(topic, task_id, **kwargs):
"""Produce Kafka messages for different topics."""
if topic == TOPIC_CSG_INSIGHT_SA_NOTIFY_EVENT_DEV:
message = {"Date": "2022-10-03"}
elif topic == TOPIC_CSG_INSIGHT_MATERIALIZED_VIEW_REFRESH_DEV:
message = {"action": "refresh"}
else:
raise Exception(f"Topic not recognized: {topic}")
key = task_id.encode("utf-8")
value = json.dumps(message).encode("utf-8")
logger.info(f"Producing message for topic {topic} with key: {task_id} and value: {message}")
return ((key, value),)
# DAG definition
with DAG(
"producer_ddcm_conf_airflow_kafka",
default_args=default_args,
description="Simple Kafka Producer Example",
schedule_interval=timedelta(minutes=5),
start_date=datetime.utcnow(),
catchup=False,
) as dag:
# Task to load Kafka connections
t0 = PythonOperator(
task_id="load_connections",
python_callable=load_connections,
)
# Task to produce a message to TOPIC_SA_NOTIFY_EVENT
t1 = ProduceToTopicOperator(
task_id=TASK_ID_CSG_INSIGHT_SA_NOTIFY_EVENT_DEV,
kafka_config_id="kafka_connect_2",
topic=TOPIC_CSG_INSIGHT_SA_NOTIFY_EVENT_DEV,
producer_function=partial(producer_function, TOPIC_CSG_INSIGHT_SA_NOTIFY_EVENT_DEV, TASK_ID_CSG_INSIGHT_SA_NOTIFY_EVENT_DEV),
)
# Task to produce a message to TOPIC_MAT_VIEW_REFRESH
t2 = ProduceToTopicOperator(
task_id=TASK_ID_CSG_INSIGHT_MATERIALIZED_VIEW_REFRESH_DEV,
kafka_config_id="kafka_connect_2",
topic=TOPIC_CSG_INSIGHT_MATERIALIZED_VIEW_REFRESH_DEV,
producer_function=partial(producer_function, TOPIC_CSG_INSIGHT_MATERIALIZED_VIEW_REFRESH_DEV, TASK_ID_CSG_INSIGHT_MATERIALIZED_VIEW_REFRESH_DEV),
)
# Task dependencies
t0 >> [t1, t2]
Please help me with the solution.