最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

sql - dynamically build where clause from pyspark dataframe - Stack Overflow

programmeradmin2浏览0评论

I have a pyspark dataframe which looks like this:

df_criterias = spark.createDataFrame(
    [
        ("IN ('ABC')", "IN ('XYZ')", "<2021", "", "Top"),
        ("IN ('ABC')", "NOT IN ('JKL','MNO')", "IN ('2021')", "", "Bottom"),
    ],
    ["CriteriaA", "CriteriaB", "CriteriaC", "CriteriaD", "Result"]
)
display(df_criterias)

I also have a dictionary that looks like this:

col_map = {
"CriteriaA" : "ColumnA",
"CriteriaB" : "ColumnB",
"CriteriaC" : "ColumnC",
"CriteriaD" : "ColumnD"
}

For each record of the pyspark dataframe above, I want to build a where clause statement, e.g. "ColumnA IN ('ABC') AND ColumnB ('XYZ') AND ColumnC < 2021"

Note that ColumnD is not included in the where clause statement above because CriteriaD has no value in the dataframe.

The idea is then to filter another dataframe using this where clause statement at the end.

I have a pyspark dataframe which looks like this:

df_criterias = spark.createDataFrame(
    [
        ("IN ('ABC')", "IN ('XYZ')", "<2021", "", "Top"),
        ("IN ('ABC')", "NOT IN ('JKL','MNO')", "IN ('2021')", "", "Bottom"),
    ],
    ["CriteriaA", "CriteriaB", "CriteriaC", "CriteriaD", "Result"]
)
display(df_criterias)

I also have a dictionary that looks like this:

col_map = {
"CriteriaA" : "ColumnA",
"CriteriaB" : "ColumnB",
"CriteriaC" : "ColumnC",
"CriteriaD" : "ColumnD"
}

For each record of the pyspark dataframe above, I want to build a where clause statement, e.g. "ColumnA IN ('ABC') AND ColumnB ('XYZ') AND ColumnC < 2021"

Note that ColumnD is not included in the where clause statement above because CriteriaD has no value in the dataframe.

The idea is then to filter another dataframe using this where clause statement at the end.

Share Improve this question edited 2 days ago tommyhmt asked Mar 31 at 16:09 tommyhmttommyhmt 3275 silver badges18 bronze badges 5
  • I dont think it is a good idea to put your filters in a dataframe. Spark doesn't work like that – abiratsis Commented Apr 1 at 9:15
  • thanks but it's not really up to me, that's how the client stores the data. It's ok I've worked it out anyway just need to test it with a larger dataset now – tommyhmt Commented Apr 1 at 11:31
  • How are we supposed to know which line is for each dict items? Like: In your dictionary you set the keys as the df columns and the values as the another df columns name, but with no conditions that distinct one line from another. Your example: "ColumnA IN ('ABC') AND ColumnB ('XYZ') AND ColumnC < 2021" Breaking down: ColumnA IN ('ABC') (fine, both lines are IN ('ABC')) ColumnB ('XYZ') (how do we know if it is IN ('XYZ') or NOT IN ('JKL', 'MNO'))? ColumnC < 2021 (how are we supposed to know which one to choose between < 2021 and IN ('2021')?) This how I think about it man, sorry – Fred Alisson Commented Apr 1 at 13:19
  • @tommyhmt did you end up figuring this out? if so, you could answer your own question so that people who have a similar problem in the future can find your answer as well, thanks! – Derek O Commented Apr 1 at 18:27
  • @Fred Alisson the aim here is just to build a where clause, but if you really must know if in the other dataframe Column C is less than 2021 then it satisfies the first rule, if it's 2021 then it satisfies the second rule – tommyhmt Commented Apr 2 at 8:44
Add a comment  | 

3 Answers 3

Reset to default 1

You can leverage concat_ws to build your condition.

First use concat_ws to concat the column name to criteria. Then, use concat_ws again to concat all criterion with AND.

df = (df.withColumn('where', 
                    F.array([F.when(F.col(key) != '', F.concat_ws(' ', *[F.lit(value), F.col(key)])) 
                             for key, value in col_map.items()]))
      .withColumn('where', F.concat_ws(' AND ', F.array_compact(F.col('where'))))
      .select('where'))

This is the solution I came up with, which is completely dynamic so df_criterias can have as many condition columns as it wants

df_criterias = spark.createDataFrame(
[
    ("IN ('ABC')", "IN ('XYZ')", "<2021", "", "Top"),
    ("IN ('ABC')", "NOT IN ('JKL','MNO')", "IN ('2021')", "", "Bottom"),
    ],
["CriteriaA", "CriteriaB", "CriteriaC", "CriteriaD", "Result"]
)

dict = {
"CriteriaA" : "ColumnA",
"CriteriaB" : "ColumnB",
"CriteriaC" : "ColumnC",
"CriteriaD" : "ColumnD"
}

# Rename rule columns and retrieve only columns defined in dictionary above
df_criterias_renamed = df_criterias.select([col(x).alias(dict.get(x, x)) for x in dict.keys()])
 
# Set up all combinations of rules
rows_criterias = df_criterias_renamed.distinct().collect()
 
# Cycle through rules
for row in rows_criterias:
    filters = row.asDict()
    # Ignore if filter is blank
    where_clause_list = [f"{k} {v}" for k,v in filters.items() if v != "" and k!= "Result"]
    # Combine all clauses together
    where_clause = " AND ".join(where_clause_list)
    print(where_clause)

This can be done by following the below steps:

  1. Convert dictionary to dataframe
  2. After converting, transpose the converted Dataframe
  3. Join Dictionary with the main Dataframe
  4. Remove the ColumnD value from the dataframe
  5. Convert list to string

Below is the code --

import pandas as pd
from pyspark.sql import SparkSession, Row, functions as f
spark = SparkSession.builder.getOrCreate()

df = spark.createDataFrame(
    data=[
        Row("IN ('ABC')", "IN ('XYZ')", "<2021", "", "Top"),
        Row("IN ('ABC')", "NOT IN ('JKL','MNO')", "IN ('2021')", "", "Bottom")
    ],
    schema='CriteriaA string, CriteriaB string, CriteriaC string, CriteriaD string, Result string'
)

df = df.unpivot("Result",["CriteriaA","CriteriaB","CriteriaC","CriteriaD"],"ColumnName","val")

mydict = {"CriteriaA": "ColumnA",
         "CriteriaB": "ColumnB",
         "CriteriaC": "ColumnC",
         "CriteriaD": "ColumnD"       
         }
#-------------convert dictionary to dataframe--------------------
pd_df = pd.DataFrame(mydict,[0])
pd_df = spark.createDataFrame(pd_df)
pd_df = pd_df.withColumn("RowNumber",f.lit(1))

#-----Transpose the Dataframe ----------------------

pd_df = pd_df.unpivot("RowNumber",["CriteriaA","CriteriaB","CriteriaC","CriteriaD"],"colname","val1")
#-------join Dictionary with Dataframe--------
pd_df = df.join(pd_df,df.ColumnName == pd_df.colname,"inner").drop("colname")
#-----------Remove the ColumnD value from the dataframe----
pd_df = pd_df.filter(pd_df.val != "").select("RowNumber",f.concat("val1", f.lit(" "), "val").alias("wherecond"))
pd_df = pd_df.groupBy("RowNumber").agg(f.collect_list("wherecond").alias("wherecond"))
#----Convert list to string----
pd_df = pd_df.select("wherecond").collect()
pd_df = ','.join(map(str, pd_df[0]))
pd_df = pd_df.replace(','," AND ").replace('"','').replace("[",'').replace("]",'')
print(pd_df)

发布评论

评论列表(0)

  1. 暂无评论