I am trying to read the CSV data ingested by the Database Migration Service(AWS)
like below:
spark = SparkSession.builder \
.config(f"spark.sql.catalog.{catalog_nm}", ".apache.iceberg.spark.SparkCatalog") \
.config(f"spark.sql.catalog.{catalog_nm}.warehouse", warehouse_path) \
.config(f"spark.sql.catalog.{catalog_nm}.catalog-impl", ".apache.iceberg.aws.glue.GlueCatalog") \
.config(f"spark.sql.catalog.{catalog_nm}.io-impl", ".apache.iceberg.aws.s3.S3FileIO") \
.config("spark.sql.extensions", ".apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.iceberg.handle-timestamp-without-timezone", "true") \
.getOrCreate()
raw_df = spark.read.option("header","true").csv(f"s3://.../public/{table_name}/")
Now, I am trying to create an iceberg
table using Pyspark
as below:
raw_df.select("id","city","state").write.format("iceberg") \
.mode("overwrite") \
.saveAsTable(f"{catalog_nm}.{database_op}.{table_op}_11")
But when the data is viewed in Athena
, the data in columns are shuffled like below:
Where the ID column has the data(Insert/Update flags) of the OP column generated by the DMS service. I also tried to create a schema and append the data, but I still had the same issue. I also tried to drop the "op" column, but when I printed the schema of the dataframe, the op
column wasn't present.
So, what am I missing here?