I have a function check_fun
where the user inputs one table
and additional argumments of column names. This function later on evaluates some checks depending on which variables has been inputed in the function call. This works fine but each evaluation and append takes a lot of time. How can I rewrite it in one call?
Data
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, col, when, count
import pandas as pd
# Sample data
data = [(1, 12, 1, 5), (6, 8, 1, 6), (7, 15, 1, 7), (4, 9, 1, 12), (10, 11, 1, 9)]
columns = ["a", "b", "c", "d"]
df = spark.createDataFrame(data, columns)
The original function
def check_fun(df,
a_input: str = None,
b_input: str = None,
c_input: str = None,
d_input: str = None):
columns = ['check', 'description', 'count']
all_res = pd.DataFrame(columns = columns)
# check1
if a_input is not None and b_input is not None:
check1_count = df.filter(col(a_input) > col(b_input)).count()
check_1_res = pd.DataFrame([['check1', 'a > b', check1_count]],
columns = columns)
all_res = all_res.append(check_1_res)
# check2
if a_input is not None and c_input is not None:
check2_count = df.filter(col(a_input) > col(c_input)).count()
check_2_res = pd.DataFrame([['check2', 'a > c', check2_count]],
columns = columns)
all_res = all_res.append(check_2_res)
# check3
if a_input is not None and d_input is not None:
check3_count = df.filter(col(a_input) > col(d_input)).count()
check_3_res = pd.DataFrame([['check3', 'a > d', check3_count]],
columns=columns)
all_res = all_res.append(check_3_res)
# check4
if b_input is not None and c_input is not None:
check4_count = df.filter(col(a_input) < col(d_input)).count()
check_4_res = pd.DataFrame([['check4', 'a < d', check4_count]],
columns=columns)
all_res = all_res.append(check_4_res)
return(all_res)
How I tried solving it:
a = "a"
b = "b"
c = "c"
d = "d"
df.agg(
when(a is not None and b is not None, sum(when(col(a) > col(b), 1).otherwise(0)).otherwise(None).alias('check1')
)).show()
But this returns an error...
I have a function check_fun
where the user inputs one table
and additional argumments of column names. This function later on evaluates some checks depending on which variables has been inputed in the function call. This works fine but each evaluation and append takes a lot of time. How can I rewrite it in one call?
Data
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, col, when, count
import pandas as pd
# Sample data
data = [(1, 12, 1, 5), (6, 8, 1, 6), (7, 15, 1, 7), (4, 9, 1, 12), (10, 11, 1, 9)]
columns = ["a", "b", "c", "d"]
df = spark.createDataFrame(data, columns)
The original function
def check_fun(df,
a_input: str = None,
b_input: str = None,
c_input: str = None,
d_input: str = None):
columns = ['check', 'description', 'count']
all_res = pd.DataFrame(columns = columns)
# check1
if a_input is not None and b_input is not None:
check1_count = df.filter(col(a_input) > col(b_input)).count()
check_1_res = pd.DataFrame([['check1', 'a > b', check1_count]],
columns = columns)
all_res = all_res.append(check_1_res)
# check2
if a_input is not None and c_input is not None:
check2_count = df.filter(col(a_input) > col(c_input)).count()
check_2_res = pd.DataFrame([['check2', 'a > c', check2_count]],
columns = columns)
all_res = all_res.append(check_2_res)
# check3
if a_input is not None and d_input is not None:
check3_count = df.filter(col(a_input) > col(d_input)).count()
check_3_res = pd.DataFrame([['check3', 'a > d', check3_count]],
columns=columns)
all_res = all_res.append(check_3_res)
# check4
if b_input is not None and c_input is not None:
check4_count = df.filter(col(a_input) < col(d_input)).count()
check_4_res = pd.DataFrame([['check4', 'a < d', check4_count]],
columns=columns)
all_res = all_res.append(check_4_res)
return(all_res)
How I tried solving it:
a = "a"
b = "b"
c = "c"
d = "d"
df.agg(
when(a is not None and b is not None, sum(when(col(a) > col(b), 1).otherwise(0)).otherwise(None).alias('check1')
)).show()
But this returns an error...
Share Improve this question asked Jan 29 at 10:24 MLENMLEN 2,5613 gold badges22 silver badges41 bronze badges 3 |1 Answer
Reset to default 0You're probably after something like this:
def check_fun(df, a=None, b=None, c=None, d=None):
columns = ['check', 'description', 'count']
results = [
[check_name, description, df.filter(col(x) > col(y)).count()]
for x, y, check_name, description in [
(a, b, "check1", "a > b"),
(a, c, "check2", "a > c"),
(a, d, "check3", "a > d"),
(d, a, "check4", "a < d")
]
if x is not None and y is not None
]
return pd.DataFrame(results, columns=columns) if results else pd.DataFrame(columns=columns)
Depending on the correct interpretation of 'check4'
, which had an inconsistency in the code you shared.
if
statement:if b_input is not None and c_input is not None:
- however, you're comparinga < d
in the logic that follows? – Grismar Commented Jan 29 at 10:32if
statement, or the logic that follows? You can edit the question to fix it, and people won't need to refer to the comments to make sense of it. – Grismar Commented Jan 29 at 12:53