最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

database - Scheduling two linked QuestDB queries with Dagster - Stack Overflow

programmeradmin1浏览0评论

I want to run two QuestDB queries: one for downsampling data older than three weeks and aggregating at 30 seconds intervals, and one to delete the partition with the raw data. At the moment I just execute with a cron using curl and the rest API

curl -G --data-urlencode "query=INSERT INTO downsampled_tb SELECT timestamp, min(price) as low, max(price) as high, first(price) as open, last(price) as close FROM trades WHERE timestamp < dateadd('d', -21, now());"

curl -G --data-urlencode "ALTER TABLE trades DROP PARTITION WHERE timestamp < dateadd('d', -21, now());"

I would like to use Dagster instead, so I can do backfilling for past dates and have better error control. I checked Dagster and I cannot see any QuestDB operator. What would be the best way to do this?

I want to run two QuestDB queries: one for downsampling data older than three weeks and aggregating at 30 seconds intervals, and one to delete the partition with the raw data. At the moment I just execute with a cron using curl and the rest API

curl -G --data-urlencode "query=INSERT INTO downsampled_tb SELECT timestamp, min(price) as low, max(price) as high, first(price) as open, last(price) as close FROM trades WHERE timestamp < dateadd('d', -21, now());"

curl -G --data-urlencode "ALTER TABLE trades DROP PARTITION WHERE timestamp < dateadd('d', -21, now());"

I would like to use Dagster instead, so I can do backfilling for past dates and have better error control. I checked Dagster and I cannot see any QuestDB operator. What would be the best way to do this?

Share Improve this question asked Jan 29 at 16:44 Javier RamirezJavier Ramirez 4,0951 gold badge27 silver badges36 bronze badges
Add a comment  | 

1 Answer 1

Reset to default 0

Since Dagster supports writing operators using python, we can just define a resource based on psycopg, and then define @ops using that resource

import psycopg

from dagster import op, graph, resource, out

# Define resources for PostgreSQL
@resource(config_schema={"connection_string": str})
def postgres_resource(context):
    conn = psycopg.connect(context.resource_config["connection_string"])
    try:
        yield conn
    finally:
        conn.close()

# Define operations
@op(required_resource_keys={'postgres'}, out=Out(str))
def downsample_data(context, execution_date: str):
    sql_query = f"""
        INSERT INTO downsampled_tb SELECT timestamp, min(price) as low, 
        max(price) as high, first(price) as open, last(price) as close 
        FROM trades WHERE timestamp = '{execution_date}';'
    """
    with context.resources.postgres.cursor() as cursor:
        cursor.execute(sql_query)    
    return execution_date

# Define operations
@op(required_resource_keys={'postgres'}, out=Out(str))
def remove_partition(context, execution_date: str):
    sql_query = f"""
        ALTER TABLE trades DROP PARTITION 
        WHERE timestamp = '{execution_date}';'
    """
    with context.resources.postgres.cursor() as cursor:
        cursor.execute(sql_query)    
    return execution_date

@op(out=Out(str))
def get_execution_date():
    #TODO

@graph
def questdb_downsampler():
    execution_date = get_execution_date()
    remove_partition(downsample_data(execution_date))

Note in this case I am using a connection_string that should be provided during the launch, and a dummy get_execution_date, which should probably be calculated using partitions, so we can backfill properly.

I am making both @ops to return the execution date, so I can pass the output of the first as a parameter to the second and declare a dependency. If the first function returns an error, the second will not proceed.

发布评论

评论列表(0)

  1. 暂无评论