I have the following dataframe.
df = spark.createDataFrame(
[
[1, 'AB', 12, '2022-01-01']
, [1, 'AA', 22, '2022-01-10']
, [1, 'AC', 11, '2022-01-11']
, [2, 'AB', 22, '2022-02-01']
, [2, 'AA', 28, '2022-02-10']
, [2, 'AC', 25, '2022-02-22']
]
, 'code: int, doc_type: string, amount: int, load_date: string'
)
df = df.withColumn('load_date', F.to_date('load_date'))
I want to pivot the amount but just want the first value from the date. This is what I tried and it is not giving me the desried results.
(
df.groupBy('code')
.pivot('doc_type', ['AB', 'AA', 'AC'])
.agg(F.sum('amount').alias('amnt'), F.first('load_date').alias('ldt'))
.show()
)
+----+-------+----------+-------+----------+-------+----------+
|code|AB_amnt| AB_ldt|AA_amnt| AA_ldt|AC_amnt| AC_ldt|
+----+-------+----------+-------+----------+-------+----------+
| 1| 12|2022-01-01| 22|2022-01-10| 11|2022-01-11|
| 2| 22|2022-02-01| 28|2022-02-10| 25|2022-02-22|
+----+-------+----------+-------+----------+-------+----------+
This is what I want.
(
df.groupBy('code')
.agg(
F.sum(F.when(F.col('doc_type') == 'AB', F.col('amount'))).alias('AB_amnt')
, F.sum(F.when(F.col('doc_type') == 'AA', F.col('amount'))).alias('AA_amnt')
, F.sum(F.when(F.col('doc_type') == 'AC', F.col('amount'))).alias('AC_amnt')
, F.first('load_date').alias('load_date')
)
.show()
)
+----+-------+-------+-------+----------+
|code|AB_amnt|AA_amnt|AC_amnt| load_date|
+----+-------+-------+-------+----------+
| 1| 12| 22| 11|2022-01-01|
| 2| 22| 28| 25|2022-02-01|
+----+-------+-------+-------+----------+
Is there any simpler way to do it? I have more than one column to put into pivot and also to put into non pivot.
I am using Databricks 14.3 LTS with Spark 3.5.0
I have the following dataframe.
df = spark.createDataFrame(
[
[1, 'AB', 12, '2022-01-01']
, [1, 'AA', 22, '2022-01-10']
, [1, 'AC', 11, '2022-01-11']
, [2, 'AB', 22, '2022-02-01']
, [2, 'AA', 28, '2022-02-10']
, [2, 'AC', 25, '2022-02-22']
]
, 'code: int, doc_type: string, amount: int, load_date: string'
)
df = df.withColumn('load_date', F.to_date('load_date'))
I want to pivot the amount but just want the first value from the date. This is what I tried and it is not giving me the desried results.
(
df.groupBy('code')
.pivot('doc_type', ['AB', 'AA', 'AC'])
.agg(F.sum('amount').alias('amnt'), F.first('load_date').alias('ldt'))
.show()
)
+----+-------+----------+-------+----------+-------+----------+
|code|AB_amnt| AB_ldt|AA_amnt| AA_ldt|AC_amnt| AC_ldt|
+----+-------+----------+-------+----------+-------+----------+
| 1| 12|2022-01-01| 22|2022-01-10| 11|2022-01-11|
| 2| 22|2022-02-01| 28|2022-02-10| 25|2022-02-22|
+----+-------+----------+-------+----------+-------+----------+
This is what I want.
(
df.groupBy('code')
.agg(
F.sum(F.when(F.col('doc_type') == 'AB', F.col('amount'))).alias('AB_amnt')
, F.sum(F.when(F.col('doc_type') == 'AA', F.col('amount'))).alias('AA_amnt')
, F.sum(F.when(F.col('doc_type') == 'AC', F.col('amount'))).alias('AC_amnt')
, F.first('load_date').alias('load_date')
)
.show()
)
+----+-------+-------+-------+----------+
|code|AB_amnt|AA_amnt|AC_amnt| load_date|
+----+-------+-------+-------+----------+
| 1| 12| 22| 11|2022-01-01|
| 2| 22| 28| 25|2022-02-01|
+----+-------+-------+-------+----------+
Is there any simpler way to do it? I have more than one column to put into pivot and also to put into non pivot.
I am using Databricks 14.3 LTS with Spark 3.5.0
Share asked Mar 9 at 0:33 DhruvDhruv 4793 silver badges22 bronze badges 01 Answer
Reset to default 0From the question you need above output by pivoting.
These are the way you have to do that in a simple way..
I have 2 options as below Option 2 is the one you are looking with direct pivot function
Option 1 use group by , agg join as it is self explanatory :
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
def main():
spark = SparkSession.builder.appName("Your Pivot").getOrCreate()
df = spark.createDataFrame(
[
[1, 'AB', 12, '2022-01-01']
, [1, 'AA', 22, '2022-01-10']
, [1, 'AC', 11, '2022-01-11']
, [2, 'AB', 22, '2022-02-01']
, [2, 'AA', 28, '2022-02-10']
, [2, 'AC', 25, '2022-02-22']
]
, 'code: int, doc_type: string, amount: int, load_date: string'
)
df = df.withColumn('load_date', F.to_date('load_date'))
pivoted_df = (
df
.groupBy('code').pivot('doc_type')
.agg(F.sum('amount'))
.join(
df.groupBy('code').agg(F.first('load_date').alias('load_date')),
'code'
)
)
pivoted_df.selectExpr("code", "AB as AB_amnt", "AA as AA_amnt", "AC as AC_amnt", "load_date").show
spark.stop()
if __name__ == "__main__":
main()
Result :
C:\Users\ramgh\AppData\Local\Microsoft\WindowsApps\python3.8.exe C:\Users\ramgh\Downloads\spark-3.1.2-bin-hadoop3.2\python_pyspark\Pivot.py
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/C:/Users/ramgh/AppData/Local/Packages/PythonSoftwareFoundation.Python.3.8_qbz5n2kfra8p0/LocalCache/local-packages/Python38/site-packages/pyspark/jars/slf4j-log4j12-1.7.30.jar!//slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/C:/Users/ramgh/AppData/Local/Packages/PythonSoftwareFoundation.Python.3.8_qbz5n2kfra8p0/LocalCache/local-packages/Python38/site-packages/pyspark/jars/slf4j-simple-1.7.36.jar!//slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j./codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [.slf4j.impl.Log4jLoggerFactory]
Using Spark's default log4j profile: /apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/09 19:40:03 WARN ProcfsMetricsGetter: Exception when trying to compute pagesize, as a result reporting of ProcessTree metrics is stopped
+----+-------+-------+-------+----------+
|code|AB_amnt|AA_amnt|AC_amnt|load_date |
+----+-------+-------+-------+----------+
|1 |12 |22 |11 |2022-01-01|
|2 |22 |28 |25 |2022-02-01|
+----+-------+-------+-------+----------+
Option 2 : Use pivot and agg directly
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
def main():
spark = SparkSession.builder.appName("My Pivot with pivot function").getOrCreate()
df = spark.createDataFrame(
[
[1, 'AB', 12, '2022-01-01']
, [1, 'AA', 22, '2022-01-10']
, [1, 'AC', 11, '2022-01-11']
, [2, 'AB', 22, '2022-02-01']
, [2, 'AA', 28, '2022-02-10']
, [2, 'AC', 25, '2022-02-22']
]
, 'code: int, doc_type: string, amount: int, load_date: string'
)
df = df.withColumn('load_date', F.to_date('load_date'))
pivoted_df = (
df
.groupBy('code')
.pivot('doc_type')
.agg(F.sum('amount'))
)
result_df = pivoted_df.selectExpr(
"code",
"AB as AB_amnt",
"AA as AA_amnt",
"AC as AC_amnt"
)
result_df.join(
df.groupBy('code').agg(F.min('load_date').alias('load_date')),
'code'
).show()
spark.stop()
if __name__ == "__main__":
main()
Result :
+----+-------+-------+-------+----------+
|code|AB_amnt|AA_amnt|AC_amnt| load_date|
+----+-------+-------+-------+----------+
| 1| 12| 22| 11|2022-01-01|
| 2| 22| 28| 25|2022-02-01|
+----+-------+-------+-------+----------+