I have a pyspark dataframe which looks like this:
df_criterias = spark.createDataFrame(
[
("IN ('ABC')", "IN ('XYZ')", "<2021", "", "Top"),
("IN ('ABC')", "NOT IN ('JKL','MNO')", "IN ('2021')", "", "Bottom"),
],
["CriteriaA", "CriteriaB", "CriteriaC", "CriteriaD", "Result"]
)
display(df_criterias)
I also have a dictionary that looks like this:
col_map = {
"CriteriaA" : "ColumnA",
"CriteriaB" : "ColumnB",
"CriteriaC" : "ColumnC",
"CriteriaD" : "ColumnD"
}
For each record of the pyspark dataframe above, I want to build a where clause statement, e.g. "ColumnA IN ('ABC') AND ColumnB ('XYZ') AND ColumnC < 2021"
Note that ColumnD is not included in the where clause statement above because CriteriaD has no value in the dataframe.
The idea is then to filter another dataframe using this where clause statement at the end.
I have a pyspark dataframe which looks like this:
df_criterias = spark.createDataFrame(
[
("IN ('ABC')", "IN ('XYZ')", "<2021", "", "Top"),
("IN ('ABC')", "NOT IN ('JKL','MNO')", "IN ('2021')", "", "Bottom"),
],
["CriteriaA", "CriteriaB", "CriteriaC", "CriteriaD", "Result"]
)
display(df_criterias)
I also have a dictionary that looks like this:
col_map = {
"CriteriaA" : "ColumnA",
"CriteriaB" : "ColumnB",
"CriteriaC" : "ColumnC",
"CriteriaD" : "ColumnD"
}
For each record of the pyspark dataframe above, I want to build a where clause statement, e.g. "ColumnA IN ('ABC') AND ColumnB ('XYZ') AND ColumnC < 2021"
Note that ColumnD is not included in the where clause statement above because CriteriaD has no value in the dataframe.
The idea is then to filter another dataframe using this where clause statement at the end.
Share Improve this question edited 2 days ago tommyhmt asked Mar 31 at 16:09 tommyhmttommyhmt 3275 silver badges18 bronze badges 5- I dont think it is a good idea to put your filters in a dataframe. Spark doesn't work like that – abiratsis Commented Apr 1 at 9:15
- thanks but it's not really up to me, that's how the client stores the data. It's ok I've worked it out anyway just need to test it with a larger dataset now – tommyhmt Commented Apr 1 at 11:31
- How are we supposed to know which line is for each dict items? Like: In your dictionary you set the keys as the df columns and the values as the another df columns name, but with no conditions that distinct one line from another. Your example: "ColumnA IN ('ABC') AND ColumnB ('XYZ') AND ColumnC < 2021" Breaking down: ColumnA IN ('ABC') (fine, both lines are IN ('ABC')) ColumnB ('XYZ') (how do we know if it is IN ('XYZ') or NOT IN ('JKL', 'MNO'))? ColumnC < 2021 (how are we supposed to know which one to choose between < 2021 and IN ('2021')?) This how I think about it man, sorry – Fred Alisson Commented Apr 1 at 13:19
- @tommyhmt did you end up figuring this out? if so, you could answer your own question so that people who have a similar problem in the future can find your answer as well, thanks! – Derek O Commented Apr 1 at 18:27
- @Fred Alisson the aim here is just to build a where clause, but if you really must know if in the other dataframe Column C is less than 2021 then it satisfies the first rule, if it's 2021 then it satisfies the second rule – tommyhmt Commented Apr 2 at 8:44
3 Answers
Reset to default 1You can leverage concat_ws
to build your condition.
First use concat_ws
to concat the column name to criteria. Then, use concat_ws
again to concat all criterion with AND.
df = (df.withColumn('where',
F.array([F.when(F.col(key) != '', F.concat_ws(' ', *[F.lit(value), F.col(key)]))
for key, value in col_map.items()]))
.withColumn('where', F.concat_ws(' AND ', F.array_compact(F.col('where'))))
.select('where'))
This is the solution I came up with, which is completely dynamic so df_criterias can have as many condition columns as it wants
df_criterias = spark.createDataFrame(
[
("IN ('ABC')", "IN ('XYZ')", "<2021", "", "Top"),
("IN ('ABC')", "NOT IN ('JKL','MNO')", "IN ('2021')", "", "Bottom"),
],
["CriteriaA", "CriteriaB", "CriteriaC", "CriteriaD", "Result"]
)
dict = {
"CriteriaA" : "ColumnA",
"CriteriaB" : "ColumnB",
"CriteriaC" : "ColumnC",
"CriteriaD" : "ColumnD"
}
# Rename rule columns and retrieve only columns defined in dictionary above
df_criterias_renamed = df_criterias.select([col(x).alias(dict.get(x, x)) for x in dict.keys()])
# Set up all combinations of rules
rows_criterias = df_criterias_renamed.distinct().collect()
# Cycle through rules
for row in rows_criterias:
filters = row.asDict()
# Ignore if filter is blank
where_clause_list = [f"{k} {v}" for k,v in filters.items() if v != "" and k!= "Result"]
# Combine all clauses together
where_clause = " AND ".join(where_clause_list)
print(where_clause)
This can be done by following the below steps:
- Convert
dictionary
todataframe
- After converting, transpose the converted
Dataframe
- Join Dictionary with the main
Dataframe
- Remove the
ColumnD
value from the dataframe - Convert
list
tostring
Below is the code --
import pandas as pd
from pyspark.sql import SparkSession, Row, functions as f
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame(
data=[
Row("IN ('ABC')", "IN ('XYZ')", "<2021", "", "Top"),
Row("IN ('ABC')", "NOT IN ('JKL','MNO')", "IN ('2021')", "", "Bottom")
],
schema='CriteriaA string, CriteriaB string, CriteriaC string, CriteriaD string, Result string'
)
df = df.unpivot("Result",["CriteriaA","CriteriaB","CriteriaC","CriteriaD"],"ColumnName","val")
mydict = {"CriteriaA": "ColumnA",
"CriteriaB": "ColumnB",
"CriteriaC": "ColumnC",
"CriteriaD": "ColumnD"
}
#-------------convert dictionary to dataframe--------------------
pd_df = pd.DataFrame(mydict,[0])
pd_df = spark.createDataFrame(pd_df)
pd_df = pd_df.withColumn("RowNumber",f.lit(1))
#-----Transpose the Dataframe ----------------------
pd_df = pd_df.unpivot("RowNumber",["CriteriaA","CriteriaB","CriteriaC","CriteriaD"],"colname","val1")
#-------join Dictionary with Dataframe--------
pd_df = df.join(pd_df,df.ColumnName == pd_df.colname,"inner").drop("colname")
#-----------Remove the ColumnD value from the dataframe----
pd_df = pd_df.filter(pd_df.val != "").select("RowNumber",f.concat("val1", f.lit(" "), "val").alias("wherecond"))
pd_df = pd_df.groupBy("RowNumber").agg(f.collect_list("wherecond").alias("wherecond"))
#----Convert list to string----
pd_df = pd_df.select("wherecond").collect()
pd_df = ','.join(map(str, pd_df[0]))
pd_df = pd_df.replace(','," AND ").replace('"','').replace("[",'').replace("]",'')
print(pd_df)