最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

apply pivot only on some columns in pyspark - Stack Overflow

programmeradmin3浏览0评论

I have the following dataframe.

df = spark.createDataFrame(
    [
        [1, 'AB', 12, '2022-01-01']
        , [1, 'AA', 22, '2022-01-10']
        , [1, 'AC', 11, '2022-01-11']
        , [2, 'AB', 22, '2022-02-01']
        , [2, 'AA', 28, '2022-02-10']
        , [2, 'AC', 25, '2022-02-22']
    ]
    , 'code: int, doc_type: string, amount: int, load_date: string'
)
df = df.withColumn('load_date', F.to_date('load_date'))

I want to pivot the amount but just want the first value from the date. This is what I tried and it is not giving me the desried results.

(
    df.groupBy('code')
    .pivot('doc_type', ['AB', 'AA', 'AC'])
    .agg(F.sum('amount').alias('amnt'), F.first('load_date').alias('ldt'))
    .show()
)

+----+-------+----------+-------+----------+-------+----------+
|code|AB_amnt|    AB_ldt|AA_amnt|    AA_ldt|AC_amnt|    AC_ldt|
+----+-------+----------+-------+----------+-------+----------+
|   1|     12|2022-01-01|     22|2022-01-10|     11|2022-01-11|
|   2|     22|2022-02-01|     28|2022-02-10|     25|2022-02-22|
+----+-------+----------+-------+----------+-------+----------+

This is what I want.

(
    df.groupBy('code')
    .agg(
        F.sum(F.when(F.col('doc_type') == 'AB', F.col('amount'))).alias('AB_amnt')
        , F.sum(F.when(F.col('doc_type') == 'AA', F.col('amount'))).alias('AA_amnt')
        , F.sum(F.when(F.col('doc_type') == 'AC', F.col('amount'))).alias('AC_amnt')
        , F.first('load_date').alias('load_date')
    )
    .show()
)

+----+-------+-------+-------+----------+
|code|AB_amnt|AA_amnt|AC_amnt| load_date|
+----+-------+-------+-------+----------+
|   1|     12|     22|     11|2022-01-01|
|   2|     22|     28|     25|2022-02-01|
+----+-------+-------+-------+----------+

Is there any simpler way to do it? I have more than one column to put into pivot and also to put into non pivot.

I am using Databricks 14.3 LTS with Spark 3.5.0

I have the following dataframe.

df = spark.createDataFrame(
    [
        [1, 'AB', 12, '2022-01-01']
        , [1, 'AA', 22, '2022-01-10']
        , [1, 'AC', 11, '2022-01-11']
        , [2, 'AB', 22, '2022-02-01']
        , [2, 'AA', 28, '2022-02-10']
        , [2, 'AC', 25, '2022-02-22']
    ]
    , 'code: int, doc_type: string, amount: int, load_date: string'
)
df = df.withColumn('load_date', F.to_date('load_date'))

I want to pivot the amount but just want the first value from the date. This is what I tried and it is not giving me the desried results.

(
    df.groupBy('code')
    .pivot('doc_type', ['AB', 'AA', 'AC'])
    .agg(F.sum('amount').alias('amnt'), F.first('load_date').alias('ldt'))
    .show()
)

+----+-------+----------+-------+----------+-------+----------+
|code|AB_amnt|    AB_ldt|AA_amnt|    AA_ldt|AC_amnt|    AC_ldt|
+----+-------+----------+-------+----------+-------+----------+
|   1|     12|2022-01-01|     22|2022-01-10|     11|2022-01-11|
|   2|     22|2022-02-01|     28|2022-02-10|     25|2022-02-22|
+----+-------+----------+-------+----------+-------+----------+

This is what I want.

(
    df.groupBy('code')
    .agg(
        F.sum(F.when(F.col('doc_type') == 'AB', F.col('amount'))).alias('AB_amnt')
        , F.sum(F.when(F.col('doc_type') == 'AA', F.col('amount'))).alias('AA_amnt')
        , F.sum(F.when(F.col('doc_type') == 'AC', F.col('amount'))).alias('AC_amnt')
        , F.first('load_date').alias('load_date')
    )
    .show()
)

+----+-------+-------+-------+----------+
|code|AB_amnt|AA_amnt|AC_amnt| load_date|
+----+-------+-------+-------+----------+
|   1|     12|     22|     11|2022-01-01|
|   2|     22|     28|     25|2022-02-01|
+----+-------+-------+-------+----------+

Is there any simpler way to do it? I have more than one column to put into pivot and also to put into non pivot.

I am using Databricks 14.3 LTS with Spark 3.5.0

Share asked Mar 9 at 0:33 DhruvDhruv 4793 silver badges22 bronze badges 0
Add a comment  | 

1 Answer 1

Reset to default 0

From the question you need above output by pivoting.

These are the way you have to do that in a simple way..

I have 2 options as below Option 2 is the one you are looking with direct pivot function

Option 1 use group by , agg join as it is self explanatory :

from pyspark.sql import SparkSession
from pyspark.sql import functions as F


def main():
    spark = SparkSession.builder.appName("Your Pivot").getOrCreate()
    df = spark.createDataFrame(
        [
            [1, 'AB', 12, '2022-01-01']
            , [1, 'AA', 22, '2022-01-10']
            , [1, 'AC', 11, '2022-01-11']
            , [2, 'AB', 22, '2022-02-01']
            , [2, 'AA', 28, '2022-02-10']
            , [2, 'AC', 25, '2022-02-22']
        ]
        , 'code: int, doc_type: string, amount: int, load_date: string'
    )
    df = df.withColumn('load_date', F.to_date('load_date'))
    pivoted_df = (
        df
        .groupBy('code').pivot('doc_type')
        .agg(F.sum('amount'))
        .join(
            df.groupBy('code').agg(F.first('load_date').alias('load_date')),
            'code'
        )
    )
    pivoted_df.selectExpr("code", "AB as AB_amnt", "AA as AA_amnt", "AC as AC_amnt", "load_date").show
    spark.stop()
if __name__ == "__main__":
    main()

Result :

C:\Users\ramgh\AppData\Local\Microsoft\WindowsApps\python3.8.exe C:\Users\ramgh\Downloads\spark-3.1.2-bin-hadoop3.2\python_pyspark\Pivot.py 
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/C:/Users/ramgh/AppData/Local/Packages/PythonSoftwareFoundation.Python.3.8_qbz5n2kfra8p0/LocalCache/local-packages/Python38/site-packages/pyspark/jars/slf4j-log4j12-1.7.30.jar!//slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/C:/Users/ramgh/AppData/Local/Packages/PythonSoftwareFoundation.Python.3.8_qbz5n2kfra8p0/LocalCache/local-packages/Python38/site-packages/pyspark/jars/slf4j-simple-1.7.36.jar!//slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j./codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [.slf4j.impl.Log4jLoggerFactory]
Using Spark's default log4j profile: /apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/09 19:40:03 WARN ProcfsMetricsGetter: Exception when trying to compute pagesize, as a result reporting of ProcessTree metrics is stopped
+----+-------+-------+-------+----------+
|code|AB_amnt|AA_amnt|AC_amnt|load_date |
+----+-------+-------+-------+----------+
|1   |12     |22     |11     |2022-01-01|
|2   |22     |28     |25     |2022-02-01|
+----+-------+-------+-------+----------+

Option 2 : Use pivot and agg directly

from pyspark.sql import SparkSession
from pyspark.sql import functions as F


def main():
    spark = SparkSession.builder.appName("My Pivot with pivot function").getOrCreate()
    df = spark.createDataFrame(
        [
            [1, 'AB', 12, '2022-01-01']
            , [1, 'AA', 22, '2022-01-10']
            , [1, 'AC', 11, '2022-01-11']
            , [2, 'AB', 22, '2022-02-01']
            , [2, 'AA', 28, '2022-02-10']
            , [2, 'AC', 25, '2022-02-22']
        ]
        , 'code: int, doc_type: string, amount: int, load_date: string'
    )
    df = df.withColumn('load_date', F.to_date('load_date'))
    pivoted_df = (
        df
        .groupBy('code')
        .pivot('doc_type')
        .agg(F.sum('amount'))
    )
    result_df = pivoted_df.selectExpr(
        "code",
        "AB as AB_amnt",
        "AA as AA_amnt",
        "AC as AC_amnt"
    )
    result_df.join(
        df.groupBy('code').agg(F.min('load_date').alias('load_date')),
        'code'
    ).show()
    spark.stop()


if __name__ == "__main__":
    main()

Result :

+----+-------+-------+-------+----------+
|code|AB_amnt|AA_amnt|AC_amnt| load_date|
+----+-------+-------+-------+----------+
|   1|     12|     22|     11|2022-01-01|
|   2|     22|     28|     25|2022-02-01|
+----+-------+-------+-------+----------+
发布评论

评论列表(0)

  1. 暂无评论