Currently I'm making calculations using PySpark and trying to match data from multiple dataframes on a specific conditions.
I'm new to PySpark and decided to ask for a help.
My first dataframe contains general information about loans:
ID ContractDate MaturityDate Bank
ID1 2024-06-01 2024-06-18 A
ID2 2024-06-05 2024-06-18 B
ID3 2024-06-10 2024-06-17 C
ID4 2024-06-15 D
ID5 2024-08-01 2024-08-22 A
ID6 2024-08-08 2024-08-23 B
ID7 2024-08-20 D
My second dataframe contains information on how payments were made.
For each loan I have one or more payments done.
ID_loan PaymentDate PaymentSum
ID1 2024-06-02 10
ID1 2024-06-08 40
ID1 2024-06-10 50
ID2 2024-06-06 30
ID2 2024-06-07 90
ID2 2024-06-08 20
ID3 2024-06-11 20
ID3 2024-06-12 30
ID3 2024-06-13 50
ID5 2024-08-10 15
ID5 2024-08-13 35
ID5 2024-08-15 30
ID6 2024-08-15 20
ID6 2024-08-16 20
ID6 2024-08-20 70
My goal is to add to the first data frame a column 'PaymentSum' which will return for each loan the amount of payment made given the fact that the payment was made on the closest date to the 'ContractDate' of loan issued by the bank 'D'.
In other words I have to get the following table:
ID ContractDate MaturityDate Bank PaymentSum
ID1 2024-06-01 2024-06-18 A 50
ID2 2024-06-05 2024-06-18 B 20
ID3 2024-06-10 2024-06-17 C 50
ID4 2024-06-15 D
ID5 2024-08-01 2024-08-22 A 30
ID6 2024-08-08 2024-08-23 B 70
ID7 2024-08-20 D
I do understand that joins here are not enough Any help is highly appreciated!
Currently I'm making calculations using PySpark and trying to match data from multiple dataframes on a specific conditions.
I'm new to PySpark and decided to ask for a help.
My first dataframe contains general information about loans:
ID ContractDate MaturityDate Bank
ID1 2024-06-01 2024-06-18 A
ID2 2024-06-05 2024-06-18 B
ID3 2024-06-10 2024-06-17 C
ID4 2024-06-15 D
ID5 2024-08-01 2024-08-22 A
ID6 2024-08-08 2024-08-23 B
ID7 2024-08-20 D
My second dataframe contains information on how payments were made.
For each loan I have one or more payments done.
ID_loan PaymentDate PaymentSum
ID1 2024-06-02 10
ID1 2024-06-08 40
ID1 2024-06-10 50
ID2 2024-06-06 30
ID2 2024-06-07 90
ID2 2024-06-08 20
ID3 2024-06-11 20
ID3 2024-06-12 30
ID3 2024-06-13 50
ID5 2024-08-10 15
ID5 2024-08-13 35
ID5 2024-08-15 30
ID6 2024-08-15 20
ID6 2024-08-16 20
ID6 2024-08-20 70
My goal is to add to the first data frame a column 'PaymentSum' which will return for each loan the amount of payment made given the fact that the payment was made on the closest date to the 'ContractDate' of loan issued by the bank 'D'.
In other words I have to get the following table:
ID ContractDate MaturityDate Bank PaymentSum
ID1 2024-06-01 2024-06-18 A 50
ID2 2024-06-05 2024-06-18 B 20
ID3 2024-06-10 2024-06-17 C 50
ID4 2024-06-15 D
ID5 2024-08-01 2024-08-22 A 30
ID6 2024-08-08 2024-08-23 B 70
ID7 2024-08-20 D
I do understand that joins here are not enough Any help is highly appreciated!
Share Improve this question asked Feb 3 at 20:31 lenpyspanacblenpyspanacb 3331 silver badge10 bronze badges2 Answers
Reset to default 0You need to use Window function in this scenario.
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
df_1_ref = df_1.withColumn(
"closest_contract_date_from_D", func.first(
func.when(func.col("Bank")=="D", func.col("ContractDate")).otherwise(None), ignorenulls=True
).over(
Window.orderBy(func.asc("ContractDate")).rowsBetween(Window.currentRow, Window.unboundedFollowing)
)
).select(
"ID", "closest_contract_date_from_D"
)
First you create a reference table to collect the closest date to the ContractDate
of loan issued by the bank D. The method I use here is to use a Window function with ranking to find it. For each row, it will only search from the current row to the unbound following row.
df_2 = df_2.join(
df_1_ref, on=[func.col("ID")==func.col("ID_loan"), func.col("PaymentDate")<=func.col("closest_contract_date_from_D")], how="left"
).withColumn(
"rank", func.rank().over(Window.partitionBy("ID_loan").orderBy(func.desc("PaymentDate")))
).filter(
func.col("rank") == 1
).selectExpr(
"ID_loan AS ID", "PaymentSum"
)
Then you can join the reference table back to df_2. In the joining condition, you can add <=
to make sure that you only need the payment date which is smaller that the reference in each record. Then you can use a rank function to find the closest one.
df_1 = df_1.join(df_2, on=["ID"], how="left")
Then you can join in back to the main dataframe.
To resolve your issue please follow below code. For sample i am using loans_data1
and pay_data1
as dataframes.
Code:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, datediff, abs, row_number
from pyspark.sql.window import Window
loans_data1 = [
("ID1", "2024-06-01", "2024-06-18", "A"),
("ID2", "2024-06-05", "2024-06-18", "B"),
("ID3", "2024-06-10", "2024-06-17", "C"),
("ID4", "2024-06-15", None, "D"),
("ID5", "2024-08-01", "2024-08-22", "A"),
("ID6", "2024-08-08", "2024-08-23", "B"),
("ID7", "2024-08-20", None, "D")
]
pay_data1 = [
("ID1", "2024-06-02", 10),
("ID1", "2024-06-08", 40),
("ID1", "2024-06-10", 50),
("ID2", "2024-06-06", 30),
("ID2", "2024-06-07", 90),
("ID2", "2024-06-08", 20),
("ID3", "2024-06-11", 20),
("ID3", "2024-06-12", 30),
("ID3", "2024-06-13", 50),
("ID5", "2024-08-10", 15),
("ID5", "2024-08-13", 35),
("ID5", "2024-08-15", 30),
("ID6", "2024-08-15", 20),
("ID6", "2024-08-16", 20),
("ID6", "2024-08-20", 70)
]
loans_df1 = spark.createDataFrame(loans_data1, ["ID", "ContractDate", "MaturityDate", "Bank"])
payments_df = spark.createDataFrame(pay_data1, ["ID_loan", "PaymentDate", "PaymentSum"])
# First step use filter Bank 'D' and select `ContractDate` and use Cross join
bank_d_loans_df1 = loans_df1.filter(col("Bank") == "D").select(col("ContractDate").alias("BankD_ContractDate"))
cross_join_df = loans_df1.crossJoin(bank_d_loans_df1)
# Make sure to use join payments to the cross join result
joined_df = cross_join_df.join(
payments_df,
col("ID") == col("ID_loan"),
"left"
).withColumn("DateDiff", abs(datediff(col("BankD_ContractDate"), col("PaymentDate"))))
# Use a window function
window_spec = Window.partitionBy("ID").orderBy("DateDiff")
closest_payment_df = joined_df.withColumn("RowNum", row_number().over(window_spec)).filter(col("RowNum") == 1).drop("RowNum", "DateDiff")
# Select the columns
result_df = closest_payment_df.select(
col("ID"),
col("ContractDate"),
col("MaturityDate"),
col("Bank"),
col("PaymentSum")
)
display(result_df)
Output: