最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

sql - Dbt macro to add metadata into table - Stack Overflow

programmeradmin0浏览0评论

I would like to write macro which will add metadata to the given model (can be incremental). I started with this:

{% macro add_metadata() %}
    {% if is_incremental() %}
        , current_timestamp() as updated_at
        , session_user() as updated_by
    {% else %}
        , current_timestamp() as created_at
        , session_user () as created_by
        , null as updated_at
        , null as updated_by
    {% endif %}
{% endmacro %}

but the problem is that when I have staging model like this for example:

SELECT 1 AS id, 'Alice' AS name, 100 AS score
UNION ALL
SELECT 2, 'Bob', 200
UNION ALL
SELECT 3, 'Charlie', 300

and my target model looks like this:

{{ config(
    materialized='incremental',
    unique_key='id'
) }}

WITH source_data AS (
    SELECT s.*
    {{ add_metadata()}} 
    
    FROM {{ ref('testing_metadata') }} s
)

SELECT *
FROM source_data

Compiled query looks like that:

  merge into `ingest_abc_segmentation`.`incremental_model` as DBT_INTERNAL_DEST
        using (

WITH source_data AS (
    SELECT s.*
    
    
        , current_timestamp() as updated_at
        , session_user() as updated_by
    
 
    
    FROM `ingest_abc_segmentation`.`testing_metadata` s
)

SELECT *
FROM source_data
        ) as DBT_INTERNAL_SOURCE
        on (
                DBT_INTERNAL_SOURCE.id = DBT_INTERNAL_DEST.id
            )

    
    when matched then update set
        `id` = DBT_INTERNAL_SOURCE.`id`,`name` = DBT_INTERNAL_SOURCE.`name`,`score` = DBT_INTERNAL_SOURCE.`score`,`created_at` = DBT_INTERNAL_SOURCE.`created_at`,`created_by` = DBT_INTERNAL_SOURCE.`created_by`,`updated_at` = DBT_INTERNAL_SOURCE.`updated_at`,`updated_by` = DBT_INTERNAL_SOURCE.`updated_by`
    

    when not matched then insert
        (`id`, `name`, `score`, `created_at`, `created_by`, `updated_at`, `updated_by`)
    values
        (`id`, `name`, `score`, `created_at`, `created_by`, `updated_at`, `updated_by`)

and error which im receving: Name created_at not found inside DBT_INTERNAL_SOURCE at [37:150]

I am always receiving error that fields created_at and created_by do not exist because it tries to take it from staging model. How to improve this macro to change only updated_by and updated_at fields when I will be running the model for 2nd and next times and fields created_at and created_by will remain the same?

I would like to write macro which will add metadata to the given model (can be incremental). I started with this:

{% macro add_metadata() %}
    {% if is_incremental() %}
        , current_timestamp() as updated_at
        , session_user() as updated_by
    {% else %}
        , current_timestamp() as created_at
        , session_user () as created_by
        , null as updated_at
        , null as updated_by
    {% endif %}
{% endmacro %}

but the problem is that when I have staging model like this for example:

SELECT 1 AS id, 'Alice' AS name, 100 AS score
UNION ALL
SELECT 2, 'Bob', 200
UNION ALL
SELECT 3, 'Charlie', 300

and my target model looks like this:

{{ config(
    materialized='incremental',
    unique_key='id'
) }}

WITH source_data AS (
    SELECT s.*
    {{ add_metadata()}} 
    
    FROM {{ ref('testing_metadata') }} s
)

SELECT *
FROM source_data

Compiled query looks like that:

  merge into `ingest_abc_segmentation`.`incremental_model` as DBT_INTERNAL_DEST
        using (

WITH source_data AS (
    SELECT s.*
    
    
        , current_timestamp() as updated_at
        , session_user() as updated_by
    
 
    
    FROM `ingest_abc_segmentation`.`testing_metadata` s
)

SELECT *
FROM source_data
        ) as DBT_INTERNAL_SOURCE
        on (
                DBT_INTERNAL_SOURCE.id = DBT_INTERNAL_DEST.id
            )

    
    when matched then update set
        `id` = DBT_INTERNAL_SOURCE.`id`,`name` = DBT_INTERNAL_SOURCE.`name`,`score` = DBT_INTERNAL_SOURCE.`score`,`created_at` = DBT_INTERNAL_SOURCE.`created_at`,`created_by` = DBT_INTERNAL_SOURCE.`created_by`,`updated_at` = DBT_INTERNAL_SOURCE.`updated_at`,`updated_by` = DBT_INTERNAL_SOURCE.`updated_by`
    

    when not matched then insert
        (`id`, `name`, `score`, `created_at`, `created_by`, `updated_at`, `updated_by`)
    values
        (`id`, `name`, `score`, `created_at`, `created_by`, `updated_at`, `updated_by`)

and error which im receving: Name created_at not found inside DBT_INTERNAL_SOURCE at [37:150]

I am always receiving error that fields created_at and created_by do not exist because it tries to take it from staging model. How to improve this macro to change only updated_by and updated_at fields when I will be running the model for 2nd and next times and fields created_at and created_by will remain the same?

Share Improve this question edited Mar 4 at 13:27 Lukasz asked Mar 4 at 13:06 LukaszLukasz 3591 gold badge3 silver badges12 bronze badges 2
  • can you share the exact error you're getting, as well as the compiled SQL for that model? – Aleix CC Commented Mar 4 at 13:22
  • I added it to my main post. – Lukasz Commented Mar 4 at 13:27
Add a comment  | 

1 Answer 1

Reset to default 0

You can use the incremental_strategy along with merge_update_columns . From documentation,

dbt will update only the columns specified by the config, and keep the previous values of other columns.

In the first run, it will run in full refresh mode where only created_at and created_by is populated while updated_at and updated_by is null

In the second run or subsequent run, only updated_at and updated_by is updated while created_at and created_by remains the same since these two columns are not included in merge_update_columns

Here is the target model

    {{ config(
    materialized='incremental',
    unique_key='id',
    incremental_strategy='merge',
    merge_update_columns = ['name','score','updated_at','updated_by']
) }}

WITH source_data AS (
    SELECT s.id, s.name, s.score
     {% if is_incremental() %}
        , 'should not update' as created_at
        , 'should not update' as created_by
        , current_timestamp() as updated_at
        , 'session_user_incremental' as updated_by
        
    {% else %}
        , current_timestamp() as created_at
        , 'session_user_first_run' as created_by
        , null as updated_at
        , null as updated_by
    {% endif %}


    FROM {{ ref('staging') }} s
)

SELECT *
FROM source_data

Output of target when run for the second time

Note the created_at and created_by column remains same after the second run

发布评论

评论列表(0)

  1. 暂无评论