最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

python - AWS Glue error occurred while calling o149.toDF.TimestampNTZType - Stack Overflow

programmeradmin2浏览0评论

When attempting to use Athena to read data that I exported from a MySQL aurora DB, an error occurred saying the column payment_date contains mismatching types. The schema has the data type for payment_date set to "int" but some of the values are of data type "date".

HIVE_PARTITION_SCHEMA_MISMATCH: There is a mismatch between the table and partition schemas. The types are incompatible and cannot be coerced. The column 'payment_date' in table 'example-db.example-table' is declared as type 'int', but partition 'partition_0=29market/partition_1=order' declared column 'payment_date' as type 'date'.

As a solution, I am trying to write a code that takes a glue database table and transforms the column "payment_date", to make all of the values change to a string data type (I also tried int and date but the same error continued occurring.)

This is the error that is occurring every time:

Error Category: UNCLASSIFIED_ERROR; Failed Line Number: 52; An error occurred while calling o149.toDF. TimestampNTZType (of class .apache.spark.sql.types.TimestampNTZType$)

My understanding is that the error occurs during the conversion from DynamicFrame to DataFrame error. But I'm not sure how to avoid it from happening.

This is the code that I am using:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import col, date_format, to_timestamp, when
from pyspark.sql.types import StringType

# Define job parameters
args = getResolvedOptions(sys.argv, [
    'JOB_NAME',
    'TempDir',
])

# Initialize job context
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# Define input and output paths/locations
input_database = "example-database"
input_table = "example-table"
output_path = "s3://example-output-bucket"

# Read the input data from the Glue Data Catalog with explicit schema inference
dyf = glueContext.create_dynamic_frame.from_catalog(
    database=input_database,
    table_name=input_table,
    additional_options={"inferSchema": "true"}
)

# Convert DynamicFrame to DataFrame and handle the timestamp conversion
df = dyf.toDF()

# Function to convert various date formats to string
def convert_payment_date_to_string(df):
    return df.withColumn(
        "payment_date",
        when(col("payment_date").isNull(), None)
        .when(col("payment_date").cast("string").isNotNull(), col("payment_date").cast("string"))
        .otherwise(
            date_format(
                to_timestamp(col("payment_date")), 
                "yyyy-MM-dd HH:mm:ss"
            )
        )
    )

# Apply the transformation
transformed_df = convert_payment_date_to_string(df)

# Convert back to DynamicFrame
transformed_dyf = DynamicFrame.fromDF(transformed_df, glueContext, "transformed_data")

# Write the transformed data to S3 in Parquet format
glueContext.write_dynamic_frame.from_options(
    frame=transformed_dyf,
    connection_type="s3",
    connection_options={
        "path": output_path
    },
    format="parquet"
)

# Commit the job
jobmit()

I have tried moving when the DynamicFrame to DataFrame conversion occurs, but nothing seems to fix that error. I have also tried pulling the data straight from an S3 bucket, instead of the Glue Database, but I am using a crawler to anize the data first (using exclude patterns) which makes it easier to run the glue job on.

发布评论

评论列表(0)

  1. 暂无评论