最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

python - renaming nested columns inside array column leads to unwanted aggregation - Stack Overflow

programmeradmin6浏览0评论

I'm working with pyspark to do a data transformation that primarily renames fields / columns. The data is multiply nested, which complicates the matter. As an input, I have (simplified) data in the following format:


{
        'version': '1.1.0',
        'areaId': '056384e1-56a0-49f9-b9fa-95dd29477ce3',
        'startTimeEpoch': 1666186174621,
        'endTimeEpoch': 1666186245904,
        'waste': {'kqm': 140.185095650912, 'kqf': 140.185095650912},
        'customerOrders': [
            {
                'identifier': 'OrderIdentifier_0',
                'index': 0,
                'cuts': {'actual': 949425219, 'nominal': 751400596},
                'automaticCutControl': True,
            },
            {
                'identifier': 'OrderIdentifier_1',
                'index': 1,
                'cuts': {'actual': 1898850438, 'nominal': 1502801192},
                'automaticCutControl': False,
            },
        ],
}

As output I want this (after spark collect):

[
    Row(
        customer_orders=[
            Row(
                area_id='056384e1-56a0-49f9-b9fa-95dd29477ce3',
                start_time_epoch=1666186174621,
                customer_order_index=0,
                customer_order_identifier='OrderIdentifier_0',
                nominal_cuts=751400596,
                cuts=949425219,
                automatic_cut_control=True,
            ),
            Row(
                area_id='056384e1-56a0-49f9-b9fa-95dd29477ce3',
                start_time_epoch=1666186174621,
                customer_order_index=1,
                customer_order_identifier='OrderIdentifier_1',
                nominal_cuts=1502801192,
                cuts=1898850438,
                automatic_cut_control=False,
            )
        ]
    )
]

I'm also creating this schema with which I load the data:

schema = StructType(
    [
        StructField('version', StringType(), True),
        StructField('areaId', StringType(), True),
        StructField('startTimeEpoch', LongType(), True),
        StructField('endTimeEpoch', LongType(), True),
        StructField(
            'waste',
            StructType(
                [
                    StructField('kqm', DoubleType(), True),
                    StructField('kqf', DoubleType(), True),
                ]
            ),
            True,
        ),
        StructField(
            'customerOrders',
            ArrayType(
                StructType(
                    [
                        StructField('identifier', StringType(), True),
                        StructField('index', IntegerType(), True),
                        StructField(
                            'cuts',
                            StructType(
                                [
                                    StructField('actual', LongType(), True),
                                    StructField('nominal', LongType(), True),
                                ]
                            ),
                            True,
                        ),
                        StructField(
                            'automaticCutControl', BooleanType(), True
                        ),
                    ]
                ),
                True,
            ),
            True,
        ),
    ]
)

the inferred schema, by contrast, is this:

root
 |-- areaId: string (nullable = true)
 |-- customerOrders: array (nullable = true)
 |    |-- element: map (containsNull = true)
 |    |    |-- key: string
 |    |    |-- value: string (valueContainsNull = true)
 |-- endTimeEpoch: long (nullable = true)
 |-- startTimeEpoch: long (nullable = true)
 |-- version: string (nullable = true)
 |-- waste: map (nullable = true)
 |    |-- key: string
 |    |-- value: double (valueContainsNull = true)

Now the issue with my current approach is that structured fields inside the customer orders get aggregated accross the customer orders like this:

[
    Row(
        customer_orders=[
            Row(
                area_id='056384e1-56a0-49f9-b9fa-95dd29477ce3',
                start_time_epoch=1666186174621,
                customer_order_index=[0, 1],
                customer_order_identifier=[
                    'OrderIdentifier_0',
                    'OrderIdentifier_1',
                ],
                nominal_cuts=[751400596, 1502801192],
                cuts=[949425219, 1898850438],
                automatic_cut_control=[True, False],
            )
        ]
    )
]

This is my approach so far:

demo_d.select(
    array(
        [
            struct(
                col('areaId').alias('area_id'),
                col('startTimeEpoch').alias('start_time_epoch'),
                col('customerOrders.index').alias('customer_order_index'),
                col('customerOrders.identifier').alias(
                    'customer_order_identifier'
                ),
                col('customerOrders.cuts.nominal').alias('nominal_cuts'),
                col('customerOrders.cuts.actual').alias('cuts'),
                col('customerOrders.automaticCutControl').alias(
                    'automatic_cut_control'
                ),
            )
        ]
    ).alias('customer_orders')
).collect()

What is it that I need to do differently?

I'm working with pyspark to do a data transformation that primarily renames fields / columns. The data is multiply nested, which complicates the matter. As an input, I have (simplified) data in the following format:


{
        'version': '1.1.0',
        'areaId': '056384e1-56a0-49f9-b9fa-95dd29477ce3',
        'startTimeEpoch': 1666186174621,
        'endTimeEpoch': 1666186245904,
        'waste': {'kqm': 140.185095650912, 'kqf': 140.185095650912},
        'customerOrders': [
            {
                'identifier': 'OrderIdentifier_0',
                'index': 0,
                'cuts': {'actual': 949425219, 'nominal': 751400596},
                'automaticCutControl': True,
            },
            {
                'identifier': 'OrderIdentifier_1',
                'index': 1,
                'cuts': {'actual': 1898850438, 'nominal': 1502801192},
                'automaticCutControl': False,
            },
        ],
}

As output I want this (after spark collect):

[
    Row(
        customer_orders=[
            Row(
                area_id='056384e1-56a0-49f9-b9fa-95dd29477ce3',
                start_time_epoch=1666186174621,
                customer_order_index=0,
                customer_order_identifier='OrderIdentifier_0',
                nominal_cuts=751400596,
                cuts=949425219,
                automatic_cut_control=True,
            ),
            Row(
                area_id='056384e1-56a0-49f9-b9fa-95dd29477ce3',
                start_time_epoch=1666186174621,
                customer_order_index=1,
                customer_order_identifier='OrderIdentifier_1',
                nominal_cuts=1502801192,
                cuts=1898850438,
                automatic_cut_control=False,
            )
        ]
    )
]

I'm also creating this schema with which I load the data:

schema = StructType(
    [
        StructField('version', StringType(), True),
        StructField('areaId', StringType(), True),
        StructField('startTimeEpoch', LongType(), True),
        StructField('endTimeEpoch', LongType(), True),
        StructField(
            'waste',
            StructType(
                [
                    StructField('kqm', DoubleType(), True),
                    StructField('kqf', DoubleType(), True),
                ]
            ),
            True,
        ),
        StructField(
            'customerOrders',
            ArrayType(
                StructType(
                    [
                        StructField('identifier', StringType(), True),
                        StructField('index', IntegerType(), True),
                        StructField(
                            'cuts',
                            StructType(
                                [
                                    StructField('actual', LongType(), True),
                                    StructField('nominal', LongType(), True),
                                ]
                            ),
                            True,
                        ),
                        StructField(
                            'automaticCutControl', BooleanType(), True
                        ),
                    ]
                ),
                True,
            ),
            True,
        ),
    ]
)

the inferred schema, by contrast, is this:

root
 |-- areaId: string (nullable = true)
 |-- customerOrders: array (nullable = true)
 |    |-- element: map (containsNull = true)
 |    |    |-- key: string
 |    |    |-- value: string (valueContainsNull = true)
 |-- endTimeEpoch: long (nullable = true)
 |-- startTimeEpoch: long (nullable = true)
 |-- version: string (nullable = true)
 |-- waste: map (nullable = true)
 |    |-- key: string
 |    |-- value: double (valueContainsNull = true)

Now the issue with my current approach is that structured fields inside the customer orders get aggregated accross the customer orders like this:

[
    Row(
        customer_orders=[
            Row(
                area_id='056384e1-56a0-49f9-b9fa-95dd29477ce3',
                start_time_epoch=1666186174621,
                customer_order_index=[0, 1],
                customer_order_identifier=[
                    'OrderIdentifier_0',
                    'OrderIdentifier_1',
                ],
                nominal_cuts=[751400596, 1502801192],
                cuts=[949425219, 1898850438],
                automatic_cut_control=[True, False],
            )
        ]
    )
]

This is my approach so far:

demo_d.select(
    array(
        [
            struct(
                col('areaId').alias('area_id'),
                col('startTimeEpoch').alias('start_time_epoch'),
                col('customerOrders.index').alias('customer_order_index'),
                col('customerOrders.identifier').alias(
                    'customer_order_identifier'
                ),
                col('customerOrders.cuts.nominal').alias('nominal_cuts'),
                col('customerOrders.cuts.actual').alias('cuts'),
                col('customerOrders.automaticCutControl').alias(
                    'automatic_cut_control'
                ),
            )
        ]
    ).alias('customer_orders')
).collect()

What is it that I need to do differently?

Share Improve this question asked Mar 20 at 16:28 pmaier-bhspmaier-bhs 13 bronze badges 1
  • 1 check transform function, no need to do explode + groupby. – lihao Commented Mar 21 at 12:43
Add a comment  | 

1 Answer 1

Reset to default 0

I found this solution:

    exploded_df = exploded_df.select(
        array(
            struct(
                col('areaId').alias('area_id'),
                col('startTimeEpoch').alias('start_time_epoch'),
                col('customer_order.index').alias('customer_order_index'),
                col('customer_order.identifier').alias(
                    'customer_order_identifier'
                ),
                col('customer_order.cuts.nominal').alias('nominal_cuts'),
                col('customer_order.cuts.actual').alias('cuts'),
                col('customer_order.automaticCutControl').alias(
                    'automatic_cut_control'
                ),
            )
        ).alias('customer_orders')
    )
    
    agg_customers_df = exploded_df.groupBy(
        'customer_orders.area_id', 'customer_orders.start_time_epoch'
    ).agg(collect_list('customer_orders').alias('customer_orders'))
    
    
    columns_to_transform = ['area_id', 'start_time_epoch']
    
    
    for column in columns_to_transform:
        agg_customers_df = agg_customers_df.withColumn(
            column, col(column).getItem(0)
        )
    
    final_df = agg_customers_df
    
    final_df.collect()

发布评论

评论列表(0)

  1. 暂无评论