最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

Databricks DLT DataFrame - How to use Schemas with Comments - Stack Overflow

programmeradmin2浏览0评论

Databricks DLT DataFrame - How to use Schemas

I'm new to Databricks Delta Live Tables and DataFrames, and I'm confused about how to use schemas when reading from the stream. I'm doing table to table streaming. One of our requirements is to have comments on the columns in the tables that can be viewed from the DBX Catalog Overview page.

The input table has the following columns (trimmed down for brevity):

message_schema = StructType([
    StructField("bayId", StringType(), True, {"comment": "Bay ID"}),
    StructField("storeId", StringType(), True, {"comment": "Store ID"}),
    StructField("message", StringType(), True, {"comment": "Message content"}),
])

The message column above contains a stringified JSON object, with the following fields (trimmed down for brevity):

event_schema = StructType([
    StructField("Id", StringType(), True, {"comment": "Event ID"}),
    StructField("Payload", StructType([
        StructField("PlayerDexterity", StringType(), True, {"comment": "Player dexterity"}),
        StructField("AttackAngle", FloatType(), True, {"comment": "The vertical angle at which the club head approaches the ball"}),
    ]),
])

Here's my code to read the table from the stream and build the DF:

    df = spark.readStream.table("tablename")

    df = df.where(
        col("MESSAGE").isNotNull()
    ).select(
        col("BAYID"),
        col("STOREID"),
        F.from_json(col("MESSAGE"), message_schema).alias("MESSAGE")
    ).select(
        col("BAYID"),
        col("STOREID"),
        F.from_json(col("MESSAGE.message"), event_schema).alias("EVENT")
    ).select(
        F.expr("uuid()").alias("ID"),
        col("BAYID").alias("BAY_ID"),
        col("STOREID").alias("STORE_ID"),
        col("EVENT.Payload.PlayerDexterity").alias("PLAYER_DEXTERITY_NAME"),
        col("EVENT.Payload.AttackAngle").alias("ATTACK_ANGLE_NBR")
    )

After I run this, when I view the Catalog overview of the output table, the PLAYER_DEXTERITY_NAME and ATTACK_ANGLE_NBR columns are showing the comments that I set in the schema. However, the BAY_ID and STORE_ID columns do not have my comments.

I was able to build the comments by adding the following code after the block above:

    df = (df
          .withMetadata("BAY_ID", {"comment": "Bay ID"})
          .withMetadata("STORE_ID", {"comment": "Store ID"})
          )

However, for the sake of consistency, I would like to set the comments in the schema itself. How can I do that? What am I doing wrong?

UPDATE:

In the first answer below, it was suggested to use the schema on the @dlt.table(). However, we need to use @dlt.view(), which does not allow specifying a schema.

However, I see that there is a schema() on the DataStreamReader, so I tried this:

    df = (
        spark.readStream
        .schema(message_schema)
        .table(
            f"{bronze_catalog}.{bronze_schema}.{bronze_table_name}",
        )
    )

Unfortunately, though, this made no difference in the tables.

Databricks DLT DataFrame - How to use Schemas

I'm new to Databricks Delta Live Tables and DataFrames, and I'm confused about how to use schemas when reading from the stream. I'm doing table to table streaming. One of our requirements is to have comments on the columns in the tables that can be viewed from the DBX Catalog Overview page.

The input table has the following columns (trimmed down for brevity):

message_schema = StructType([
    StructField("bayId", StringType(), True, {"comment": "Bay ID"}),
    StructField("storeId", StringType(), True, {"comment": "Store ID"}),
    StructField("message", StringType(), True, {"comment": "Message content"}),
])

The message column above contains a stringified JSON object, with the following fields (trimmed down for brevity):

event_schema = StructType([
    StructField("Id", StringType(), True, {"comment": "Event ID"}),
    StructField("Payload", StructType([
        StructField("PlayerDexterity", StringType(), True, {"comment": "Player dexterity"}),
        StructField("AttackAngle", FloatType(), True, {"comment": "The vertical angle at which the club head approaches the ball"}),
    ]),
])

Here's my code to read the table from the stream and build the DF:

    df = spark.readStream.table("tablename")

    df = df.where(
        col("MESSAGE").isNotNull()
    ).select(
        col("BAYID"),
        col("STOREID"),
        F.from_json(col("MESSAGE"), message_schema).alias("MESSAGE")
    ).select(
        col("BAYID"),
        col("STOREID"),
        F.from_json(col("MESSAGE.message"), event_schema).alias("EVENT")
    ).select(
        F.expr("uuid()").alias("ID"),
        col("BAYID").alias("BAY_ID"),
        col("STOREID").alias("STORE_ID"),
        col("EVENT.Payload.PlayerDexterity").alias("PLAYER_DEXTERITY_NAME"),
        col("EVENT.Payload.AttackAngle").alias("ATTACK_ANGLE_NBR")
    )

After I run this, when I view the Catalog overview of the output table, the PLAYER_DEXTERITY_NAME and ATTACK_ANGLE_NBR columns are showing the comments that I set in the schema. However, the BAY_ID and STORE_ID columns do not have my comments.

I was able to build the comments by adding the following code after the block above:

    df = (df
          .withMetadata("BAY_ID", {"comment": "Bay ID"})
          .withMetadata("STORE_ID", {"comment": "Store ID"})
          )

However, for the sake of consistency, I would like to set the comments in the schema itself. How can I do that? What am I doing wrong?

UPDATE:

In the first answer below, it was suggested to use the schema on the @dlt.table(). However, we need to use @dlt.view(), which does not allow specifying a schema.

However, I see that there is a schema() on the DataStreamReader, so I tried this:

    df = (
        spark.readStream
        .schema(message_schema)
        .table(
            f"{bronze_catalog}.{bronze_schema}.{bronze_table_name}",
        )
    )

Unfortunately, though, this made no difference in the tables.

Share Improve this question edited Nov 19, 2024 at 21:27 Westy asked Nov 19, 2024 at 2:58 WestyWesty 3055 silver badges20 bronze badges 1
  • are you using the schema while reading the table? – JayashankarGS Commented Nov 19, 2024 at 3:50
Add a comment  | 

1 Answer 1

Reset to default 1

You give the schema in the dlt decorator like below.

sales_schema = StructType([
  StructField("customer_id", StringType(), True),
  StructField("customer_name", StringType(), True),
  StructField("number_of_line_items", StringType(), True),
  StructField("order_datetime", StringType(), True),
  StructField("order_number", LongType(), True)]
)

@dlt.table(
  comment="Raw data on sales",
  schema=sales_schema)
def sales():
  return ("...")

In your case you create a schema for returning dataframe from the dlt and use it.

from pyspark.sql.types import *
from pyspark.sql import functions as F
import dlt

output_schema = StructType([
    StructField("ID", StringType(), True, {"comment": "Unique identifier for the row"}),
    StructField("BAY_ID", StringType(), True, {"comment": "Bay ID"}),
    StructField("STORE_ID", StringType(), True, {"comment": "Store ID"}),
    StructField("PLAYER_DEXTERITY_NAME", StringType(), True, {"comment": "Player dexterity"}),
    StructField("ATTACK_ANGLE_NBR", FloatType(), True, {"comment": "The vertical angle at which the club head approaches the ball"})
])

event_schema = StructType([
    StructField("Id", StringType(), True, {"comment": "Event ID"}),
    StructField("Payload", StructType([
        StructField("PlayerDexterity", StringType(), True, {"comment": "Player dexterity"}),
        StructField("AttackAngle", FloatType(), True, {"comment": "The vertical angle at which the club head approaches the ball"}),
    ]),)
])


@dlt.table(
  comment="Raw data on sales",
  schema=output_schema)
def sales():
  df = spark.readStream.table("tablename")

  df = df.where(F.col("MESSAGE").isNotNull()).\
  select(F.col("BAYID"),F.col("STOREID"),F.from_json(F.col("MESSAGE"), event_schema).alias("EVENT")).\
  select(
        F.expr("uuid()").alias("ID"),
        F.col("BAYID").alias("BAY_ID"),
        F.col("STOREID").alias("STORE_ID"),
        F.col("EVENT.Payload.PlayerDexterity").alias("PLAYER_DEXTERITY_NAME"),
        F.col("EVENT.Payload.AttackAngle").alias("ATTACK_ANGLE_NBR")
    )
  return df

Below is the sample data used.

sample_data = [ 
("BAY001", "STORE123", '{"Id":"E001", "Payload":{"PlayerDexterity":"High", "AttackAngle":45.5}}'),
("BAY002", "STORE456", '{"Id":"E002", "Payload":{"PlayerDexterity":"Medium", "AttackAngle":30.0}}'), 
("BAY003", "STORE789", '{"Id":"E003", "Payload":{"PlayerDexterity":"Low", "AttackAngle":15.2}}') ]

output:

and comments in Overview tab.

发布评论

评论列表(0)

  1. 暂无评论