最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

Apache Airflow Python DAG script does not pull latest config from HC Vault - Stack Overflow

programmeradmin5浏览0评论

I have below sample Python DAG script for Apache Airflow. It is meant for producing messages to two Kafka topics. The Kafka credentials are kept in HC Vault and every time the script runs, it is supposed to fetch latest credential set from Vault. However, it is not happening - here, db.merge_conn() does not update the conn_id kafka_connect_2 with latest Kafka config (say, if there is any update in username, password etc.).

from __future__ import annotations
import json
import logging
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.apache.kafka.operators.produce import ProduceToTopicOperator
from airflow.operators.python import PythonOperator
from airflow.models import Connection
from airflow.utils import db
from functools import partial
import hvac
from hvac.api.auth_methods import AppRole

# Setting up logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

# Default arguments
default_args = {
    "owner": "airflow",
    "depend_on_past": False,
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
}

# Vault connection constants
ROLE_ID = "<role_id>"
SECRET_ID = "<secret_id>"
VAULT_ADDRESS = ";
NAMESPACE = "<namespace>"
SECRET_PATH = "<secret_path>"
MOUNT_POINT = "<mount_point>"

# Constants for task_id and topic names
TOPIC_CSG_INSIGHT_SA_NOTIFY_EVENT_DEV = "TOPIC_SA_NOTIFY_EVENT"
TOPIC_CSG_INSIGHT_MATERIALIZED_VIEW_REFRESH_DEV = "TOPIC_MAT_VIEW_REFRESH"

TASK_ID_CSG_INSIGHT_SA_NOTIFY_EVENT_DEV = "produce_to_csg_insight_sa_notify_event_dev"
TASK_ID_CSG_INSIGHT_MATERIALIZED_VIEW_REFRESH_DEV = "produce_to_csg_insight_materialized_view_refresh_dev"

def get_vault_secret():
    """Retrieve Kafka configuration from Vault."""
    try:
        client = hvac.Client(url=VAULT_ADDRESS, verify=False, namespace=NAMESPACE)
        approle = AppRole(client.adapter)
        auth_response = approle.login(role_id=ROLE_ID, secret_id=SECRET_ID, mount_point="approle")
        client.token = auth_response["auth"]["client_token"]

        if not client.is_authenticated():
            logger.error("Failed to authenticate with Vault.")
            return None

        logger.info("Successfully authenticated with Vault.")
        response = client.secrets.kv.v2.read_secret(path=SECRET_PATH, mount_point=MOUNT_POINT)

        if "data" in response:
            return response["data"]["data"]
        else:
            logger.error("Failed to retrieve secret from Vault.")
            return None
    except Exception as e:
        logger.error(f"Error while retrieving secrets from Vault: {e}")
        return None

def load_connections():
    """Load or update the Kafka connection in Airflow."""
    # Fetch the latest Kafka configurations from Vault
    config_data = get_vault_secret()
    if not config_data:
        raise ValueError("Failed to fetch configuration from Vault.")

    # Update the Kafka connection in Airflow
    db.merge_conn(
        Connection(
            conn_id="kafka_connect_2",
            conn_type="kafka",
            extra=json.dumps({
                "bootstrap.servers": config_data["bootstrap.servers.config"],
                "security.protocol": config_data["kafka.security.protocol"],
                "sasl.mechanism": config_data["sasl.mechanism"],
                "sasl.username": config_data["sasl.username"],
                "sasl.password": config_data["sasl.password"],
                "acks": "all"
            }),
        )
    )
    logger.info("Airflow Kafka connection updated successfully.")

def producer_function(topic, task_id, **kwargs):
    """Produce Kafka messages for different topics."""
    if topic == TOPIC_CSG_INSIGHT_SA_NOTIFY_EVENT_DEV:
        message = {"Date": "2022-10-03"}
    elif topic == TOPIC_CSG_INSIGHT_MATERIALIZED_VIEW_REFRESH_DEV:
        message = {"action": "refresh"}
    else:
        raise Exception(f"Topic not recognized: {topic}")
    
    key = task_id.encode("utf-8")
    value = json.dumps(message).encode("utf-8")
    
    logger.info(f"Producing message for topic {topic} with key: {task_id} and value: {message}")
    return ((key, value),)

# DAG definition
with DAG(
    "producer_ddcm_conf_airflow_kafka",
    default_args=default_args,
    description="Simple Kafka Producer Example",
    schedule_interval=timedelta(minutes=5),
    start_date=datetime.utcnow(),
    catchup=False,
) as dag:
    # Task to load Kafka connections
    t0 = PythonOperator(
        task_id="load_connections",
        python_callable=load_connections,
    )

    # Task to produce a message to TOPIC_SA_NOTIFY_EVENT
    t1 = ProduceToTopicOperator(
        task_id=TASK_ID_CSG_INSIGHT_SA_NOTIFY_EVENT_DEV,
        kafka_config_id="kafka_connect_2",
        topic=TOPIC_CSG_INSIGHT_SA_NOTIFY_EVENT_DEV,
        producer_function=partial(producer_function, TOPIC_CSG_INSIGHT_SA_NOTIFY_EVENT_DEV, TASK_ID_CSG_INSIGHT_SA_NOTIFY_EVENT_DEV),
    )

    # Task to produce a message to TOPIC_MAT_VIEW_REFRESH
    t2 = ProduceToTopicOperator(
        task_id=TASK_ID_CSG_INSIGHT_MATERIALIZED_VIEW_REFRESH_DEV,
        kafka_config_id="kafka_connect_2",
        topic=TOPIC_CSG_INSIGHT_MATERIALIZED_VIEW_REFRESH_DEV,
        producer_function=partial(producer_function, TOPIC_CSG_INSIGHT_MATERIALIZED_VIEW_REFRESH_DEV, TASK_ID_CSG_INSIGHT_MATERIALIZED_VIEW_REFRESH_DEV),
    )

    # Task dependencies
    t0 >> [t1, t2]

Please help me with the solution.

发布评论

评论列表(0)

  1. 暂无评论