I am trying to copy data from an iceberg table to a postgres table using a glue job. I have this code:
def execute_job(spark, factory: DependencyFactory, environment, logger):
print("Starting job")
sql = f"SELECT * FROM {DATABASE_NAME}.transactions ORDER BY unique_identifier, in_date"
df = spark.sql(sql)
df.show(10)
config = Config(environment)
window_spec = Window.partitionBy("unique_identifier").orderBy("in_date")
df = df.withColumn("version", row_number().over(window_spec))
df = df.withColumn("start_date", col("in_date"))
df = df.withColumn("end_date", lit("9999-12-31").cast("string"))
df.show(10)
df = df.withColumn("prev_start_date", lag("start_date").over(window_spec))
df = df.withColumn("end_date", when(col("version") == 1, col("end_date")).otherwise(col("prev_start_date")))
df = df.withColumn("id", col("unique_identifier").cast("string") + ":" + col("version").cast("string"))
df.show(10)
total_count = df.count()
print(f"Total records: {total_count}")
if total_count == 0:
print("No records to insert. Exiting.")
return
num_partitions = max(1, total_count // 100000)
processed_df = df.repartition(num_partitions)
print(f"Number of partitions: {processed_df.rdd.getNumPartitions()}")
processed_df.show(10)
def process_partition(partition):
logger.info("Processing partition")
print("Processing partition")
partition_list = list(partition)
insert_query = """
INSERT INTO transaction.transactions_master (start_date, end_date, in_date, out_date, update_time, id, version, unique_identifier, id_tr, alias, update_correlation_id, correlation_id)
VALUES %s
"""
db_client = PostgresDbClient(DbConfigBuilder(config, config['vault']['service_type'], ssl_cert_file=None))
if not partition_list:
print("Skipping empty partition")
return
batch_size = 100000
try:
values = [tuple(row) for row in partition_list]
db_client.execute_values_query(insert_query, values, page_size=batch_size)
print(f"Inserted {len(values)} records successfully")
except Exception as e:
print(f"Error inserting partition: {e}")
try:
print("Starting foreachPartition")
processed_df.foreachPartition(process_partition)
print("Finished foreachPartition")
except Exception as e:
print(f"Error during foreachPartition: {e}")
print("Data successfully copied to transaction.transactions_master PostgreSQL table")
everything seems to be working (it's printing results in the logs, also tested db connection and it seems fine). However, when it gets to processed_df.foreachPartition(process_partition) it's like the code inside process_partition is not getting executed. I'm not seeing anything in the logs. Last entries are just "Starting foreachPartition" and "Data successfully copied to transaction.transactions_master PostgreSQL table"
What could be the issue? Am I using foreachPartition wrong?
I am trying to copy data from an iceberg table to a postgres table using a glue job. I have this code:
def execute_job(spark, factory: DependencyFactory, environment, logger):
print("Starting job")
sql = f"SELECT * FROM {DATABASE_NAME}.transactions ORDER BY unique_identifier, in_date"
df = spark.sql(sql)
df.show(10)
config = Config(environment)
window_spec = Window.partitionBy("unique_identifier").orderBy("in_date")
df = df.withColumn("version", row_number().over(window_spec))
df = df.withColumn("start_date", col("in_date"))
df = df.withColumn("end_date", lit("9999-12-31").cast("string"))
df.show(10)
df = df.withColumn("prev_start_date", lag("start_date").over(window_spec))
df = df.withColumn("end_date", when(col("version") == 1, col("end_date")).otherwise(col("prev_start_date")))
df = df.withColumn("id", col("unique_identifier").cast("string") + ":" + col("version").cast("string"))
df.show(10)
total_count = df.count()
print(f"Total records: {total_count}")
if total_count == 0:
print("No records to insert. Exiting.")
return
num_partitions = max(1, total_count // 100000)
processed_df = df.repartition(num_partitions)
print(f"Number of partitions: {processed_df.rdd.getNumPartitions()}")
processed_df.show(10)
def process_partition(partition):
logger.info("Processing partition")
print("Processing partition")
partition_list = list(partition)
insert_query = """
INSERT INTO transaction.transactions_master (start_date, end_date, in_date, out_date, update_time, id, version, unique_identifier, id_tr, alias, update_correlation_id, correlation_id)
VALUES %s
"""
db_client = PostgresDbClient(DbConfigBuilder(config, config['vault']['service_type'], ssl_cert_file=None))
if not partition_list:
print("Skipping empty partition")
return
batch_size = 100000
try:
values = [tuple(row) for row in partition_list]
db_client.execute_values_query(insert_query, values, page_size=batch_size)
print(f"Inserted {len(values)} records successfully")
except Exception as e:
print(f"Error inserting partition: {e}")
try:
print("Starting foreachPartition")
processed_df.foreachPartition(process_partition)
print("Finished foreachPartition")
except Exception as e:
print(f"Error during foreachPartition: {e}")
print("Data successfully copied to transaction.transactions_master PostgreSQL table")
everything seems to be working (it's printing results in the logs, also tested db connection and it seems fine). However, when it gets to processed_df.foreachPartition(process_partition) it's like the code inside process_partition is not getting executed. I'm not seeing anything in the logs. Last entries are just "Starting foreachPartition" and "Data successfully copied to transaction.transactions_master PostgreSQL table"
What could be the issue? Am I using foreachPartition wrong?
Share Improve this question asked Feb 6 at 14:21 ekm0dekm0d 912 silver badges8 bronze badges 1 |2 Answers
Reset to default 0How foreachPartition works
When you call foreachPartition, Spark:
Serializes the function and sends it to the executors.
Each executor applies the function to its assigned partition(s).
No data is returned to the driver (unlike collect() or show()).
As @Steven has mentioned, you should check logs on Executors side. Moreover, you need to be careful with DB connections that are established from Executors.
This part:
db_client = PostgresDbClient(DbConfigBuilder(config, config['vault']['service_type'], ssl_cert_file=None))
Check whether you have all the needed libraries, open ports, network configuration, etc. - all that you have to use to connect to PostgreSQL in the same way you did it in the main program.
foreachPartition function works on executor level, you can see details logs on the executor logs rather than driver logs. Worth checking the executor logs. Also, do you see the data getting updated in PostgreSQL table?
foreach
is executed on the workers side. did you check the workers logs ? – Steven Commented Feb 6 at 15:31