Given this setup:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from datetime import datetime
spark = SparkSession.builder.getOrCreate()
data = [
(532, 1234, 5, datetime(2022, 7, 13, 12)),
(824, 1452, 3, datetime(2022, 7, 13, 12)),
(819, 2341, 5, datetime(2022, 7, 13, 12)),
(716, 1325, 4, datetime(2022, 7, 14, 12)),
(423, 1434, 2, datetime(2022, 7, 14, 12))
]
columns = ["business_id", "review_id", "review_stars", "review_date"]
reviews_df = spark.createDataFrame(data, columns)
I am trying to do the following:
total_reviews = reviews_df.count()
r = reviews_df.alias("r")
main = (
r
.filter(r.review_stars.isin([4,5]))
.agg(
F.count("business_id").alias("business_count"),
F.round(
F.col("business_count") / total_reviews * 100, 0)
.alias("top_rated_pct")
)
)
main.show()
And yet, even though, I can run this code, AI told me that the business_count
column, which was just created in .agg()
, can't be referenced immediatedly. Instead, I should use something like:
.agg(
F.count("business_id").alias("business_count")
)
.withColumn(
"top_rated_pct",
F.round(F.col("business_count") / total_reviews * 100, 0)
I tried reading the documentation but still can't make sense of whether or not what I do is acceptable. Any advice would be appreciated.
Given this setup:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from datetime import datetime
spark = SparkSession.builder.getOrCreate()
data = [
(532, 1234, 5, datetime(2022, 7, 13, 12)),
(824, 1452, 3, datetime(2022, 7, 13, 12)),
(819, 2341, 5, datetime(2022, 7, 13, 12)),
(716, 1325, 4, datetime(2022, 7, 14, 12)),
(423, 1434, 2, datetime(2022, 7, 14, 12))
]
columns = ["business_id", "review_id", "review_stars", "review_date"]
reviews_df = spark.createDataFrame(data, columns)
I am trying to do the following:
total_reviews = reviews_df.count()
r = reviews_df.alias("r")
main = (
r
.filter(r.review_stars.isin([4,5]))
.agg(
F.count("business_id").alias("business_count"),
F.round(
F.col("business_count") / total_reviews * 100, 0)
.alias("top_rated_pct")
)
)
main.show()
And yet, even though, I can run this code, AI told me that the business_count
column, which was just created in .agg()
, can't be referenced immediatedly. Instead, I should use something like:
.agg(
F.count("business_id").alias("business_count")
)
.withColumn(
"top_rated_pct",
F.round(F.col("business_count") / total_reviews * 100, 0)
I tried reading the documentation but still can't make sense of whether or not what I do is acceptable. Any advice would be appreciated.
Share Improve this question edited Mar 30 at 1:58 Ken White 126k15 gold badges236 silver badges466 bronze badges asked Mar 30 at 1:55 ExodusExodus 1562 silver badges13 bronze badges 2- Could you please tell me your desired output? business_count= 3, top_rated_pct = 60.0 ? – Subir Chowdhury Commented Mar 30 at 2:32
- Not sure which AI you used but Gemini says you can do the original code and it works. – Emma Commented Apr 3 at 18:07
1 Answer
Reset to default 0Your AI is correct. You cannot reference a column within the same select
or agg
it is being created. It is the same thing in SQL by the way.
Think of SparkSQL functions as mappers. The select function for instance is applied on a dataframe and can only reference columns of the dataframe it is applied on. All the argument of the select are computed independently. Therefore they cannot reference each other. (same thing for agg
)
A much simpler example is this:
spark.range(2).select(
(F.col("id")*2).alias("id2"),
F.col("id2")+1
)
pyspark.sql.utils.AnalysisException: Column 'id2' does not exist.
You have two workarounds in your situation. Either chain another transformation as your AI suggests:
r
.agg(F.count("business_id").alias("business_count"))
.withColumn("top_rated_pct",
F.round(F.col("business_count") / total_reviews * 100, 0)
or simply reuse F.count("business_id")
instead of its alias:
r
.agg(
F.count("business_id").alias("business_count"))
F.round(F.count("business_id") / total_reviews * 100, 0).alias("top_rated_pct")
)
In general, don't be afraid to chain select
and withColumn
transformations if it makes your code clearer. Spark's engine will do them in the same stage anyway so it won't cost you anything in terms of performance.