When attempting to use Athena to read data that I exported from a MySQL aurora DB, an error occurred saying the column payment_date contains mismatching types. The schema has the data type for payment_date set to "int" but some of the values are of data type "date".
HIVE_PARTITION_SCHEMA_MISMATCH: There is a mismatch between the table and partition schemas. The types are incompatible and cannot be coerced. The column 'payment_date' in table 'example-db.example-table' is declared as type 'int', but partition 'partition_0=29market/partition_1=order' declared column 'payment_date' as type 'date'.
As a solution, I am trying to write a code that takes a glue database table and transforms the column "payment_date", to make all of the values change to a string data type (I also tried int and date but the same error continued occurring.)
This is the error that is occurring every time:
Error Category: UNCLASSIFIED_ERROR; Failed Line Number: 52; An error occurred while calling o149.toDF. TimestampNTZType (of class .apache.spark.sql.types.TimestampNTZType$)
My understanding is that the error occurs during the conversion from DynamicFrame to DataFrame error. But I'm not sure how to avoid it from happening.
This is the code that I am using:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import col, date_format, to_timestamp, when
from pyspark.sql.types import StringType
# Define job parameters
args = getResolvedOptions(sys.argv, [
'JOB_NAME',
'TempDir',
])
# Initialize job context
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# Define input and output paths/locations
input_database = "example-database"
input_table = "example-table"
output_path = "s3://example-output-bucket"
# Read the input data from the Glue Data Catalog with explicit schema inference
dyf = glueContext.create_dynamic_frame.from_catalog(
database=input_database,
table_name=input_table,
additional_options={"inferSchema": "true"}
)
# Convert DynamicFrame to DataFrame and handle the timestamp conversion
df = dyf.toDF()
# Function to convert various date formats to string
def convert_payment_date_to_string(df):
return df.withColumn(
"payment_date",
when(col("payment_date").isNull(), None)
.when(col("payment_date").cast("string").isNotNull(), col("payment_date").cast("string"))
.otherwise(
date_format(
to_timestamp(col("payment_date")),
"yyyy-MM-dd HH:mm:ss"
)
)
)
# Apply the transformation
transformed_df = convert_payment_date_to_string(df)
# Convert back to DynamicFrame
transformed_dyf = DynamicFrame.fromDF(transformed_df, glueContext, "transformed_data")
# Write the transformed data to S3 in Parquet format
glueContext.write_dynamic_frame.from_options(
frame=transformed_dyf,
connection_type="s3",
connection_options={
"path": output_path
},
format="parquet"
)
# Commit the job
jobmit()
I have tried moving when the DynamicFrame to DataFrame conversion occurs, but nothing seems to fix that error. I have also tried pulling the data straight from an S3 bucket, instead of the Glue Database, but I am using a crawler to anize the data first (using exclude patterns) which makes it easier to run the glue job on.