I'm working with pyspark to do a data transformation that primarily renames fields / columns. The data is multiply nested, which complicates the matter. As an input, I have (simplified) data in the following format:
{
'version': '1.1.0',
'areaId': '056384e1-56a0-49f9-b9fa-95dd29477ce3',
'startTimeEpoch': 1666186174621,
'endTimeEpoch': 1666186245904,
'waste': {'kqm': 140.185095650912, 'kqf': 140.185095650912},
'customerOrders': [
{
'identifier': 'OrderIdentifier_0',
'index': 0,
'cuts': {'actual': 949425219, 'nominal': 751400596},
'automaticCutControl': True,
},
{
'identifier': 'OrderIdentifier_1',
'index': 1,
'cuts': {'actual': 1898850438, 'nominal': 1502801192},
'automaticCutControl': False,
},
],
}
As output I want this (after spark collect):
[
Row(
customer_orders=[
Row(
area_id='056384e1-56a0-49f9-b9fa-95dd29477ce3',
start_time_epoch=1666186174621,
customer_order_index=0,
customer_order_identifier='OrderIdentifier_0',
nominal_cuts=751400596,
cuts=949425219,
automatic_cut_control=True,
),
Row(
area_id='056384e1-56a0-49f9-b9fa-95dd29477ce3',
start_time_epoch=1666186174621,
customer_order_index=1,
customer_order_identifier='OrderIdentifier_1',
nominal_cuts=1502801192,
cuts=1898850438,
automatic_cut_control=False,
)
]
)
]
I'm also creating this schema with which I load the data:
schema = StructType(
[
StructField('version', StringType(), True),
StructField('areaId', StringType(), True),
StructField('startTimeEpoch', LongType(), True),
StructField('endTimeEpoch', LongType(), True),
StructField(
'waste',
StructType(
[
StructField('kqm', DoubleType(), True),
StructField('kqf', DoubleType(), True),
]
),
True,
),
StructField(
'customerOrders',
ArrayType(
StructType(
[
StructField('identifier', StringType(), True),
StructField('index', IntegerType(), True),
StructField(
'cuts',
StructType(
[
StructField('actual', LongType(), True),
StructField('nominal', LongType(), True),
]
),
True,
),
StructField(
'automaticCutControl', BooleanType(), True
),
]
),
True,
),
True,
),
]
)
the inferred schema, by contrast, is this:
root
|-- areaId: string (nullable = true)
|-- customerOrders: array (nullable = true)
| |-- element: map (containsNull = true)
| | |-- key: string
| | |-- value: string (valueContainsNull = true)
|-- endTimeEpoch: long (nullable = true)
|-- startTimeEpoch: long (nullable = true)
|-- version: string (nullable = true)
|-- waste: map (nullable = true)
| |-- key: string
| |-- value: double (valueContainsNull = true)
Now the issue with my current approach is that structured fields inside the customer orders get aggregated accross the customer orders like this:
[
Row(
customer_orders=[
Row(
area_id='056384e1-56a0-49f9-b9fa-95dd29477ce3',
start_time_epoch=1666186174621,
customer_order_index=[0, 1],
customer_order_identifier=[
'OrderIdentifier_0',
'OrderIdentifier_1',
],
nominal_cuts=[751400596, 1502801192],
cuts=[949425219, 1898850438],
automatic_cut_control=[True, False],
)
]
)
]
This is my approach so far:
demo_d.select(
array(
[
struct(
col('areaId').alias('area_id'),
col('startTimeEpoch').alias('start_time_epoch'),
col('customerOrders.index').alias('customer_order_index'),
col('customerOrders.identifier').alias(
'customer_order_identifier'
),
col('customerOrders.cuts.nominal').alias('nominal_cuts'),
col('customerOrders.cuts.actual').alias('cuts'),
col('customerOrders.automaticCutControl').alias(
'automatic_cut_control'
),
)
]
).alias('customer_orders')
).collect()
What is it that I need to do differently?
I'm working with pyspark to do a data transformation that primarily renames fields / columns. The data is multiply nested, which complicates the matter. As an input, I have (simplified) data in the following format:
{
'version': '1.1.0',
'areaId': '056384e1-56a0-49f9-b9fa-95dd29477ce3',
'startTimeEpoch': 1666186174621,
'endTimeEpoch': 1666186245904,
'waste': {'kqm': 140.185095650912, 'kqf': 140.185095650912},
'customerOrders': [
{
'identifier': 'OrderIdentifier_0',
'index': 0,
'cuts': {'actual': 949425219, 'nominal': 751400596},
'automaticCutControl': True,
},
{
'identifier': 'OrderIdentifier_1',
'index': 1,
'cuts': {'actual': 1898850438, 'nominal': 1502801192},
'automaticCutControl': False,
},
],
}
As output I want this (after spark collect):
[
Row(
customer_orders=[
Row(
area_id='056384e1-56a0-49f9-b9fa-95dd29477ce3',
start_time_epoch=1666186174621,
customer_order_index=0,
customer_order_identifier='OrderIdentifier_0',
nominal_cuts=751400596,
cuts=949425219,
automatic_cut_control=True,
),
Row(
area_id='056384e1-56a0-49f9-b9fa-95dd29477ce3',
start_time_epoch=1666186174621,
customer_order_index=1,
customer_order_identifier='OrderIdentifier_1',
nominal_cuts=1502801192,
cuts=1898850438,
automatic_cut_control=False,
)
]
)
]
I'm also creating this schema with which I load the data:
schema = StructType(
[
StructField('version', StringType(), True),
StructField('areaId', StringType(), True),
StructField('startTimeEpoch', LongType(), True),
StructField('endTimeEpoch', LongType(), True),
StructField(
'waste',
StructType(
[
StructField('kqm', DoubleType(), True),
StructField('kqf', DoubleType(), True),
]
),
True,
),
StructField(
'customerOrders',
ArrayType(
StructType(
[
StructField('identifier', StringType(), True),
StructField('index', IntegerType(), True),
StructField(
'cuts',
StructType(
[
StructField('actual', LongType(), True),
StructField('nominal', LongType(), True),
]
),
True,
),
StructField(
'automaticCutControl', BooleanType(), True
),
]
),
True,
),
True,
),
]
)
the inferred schema, by contrast, is this:
root
|-- areaId: string (nullable = true)
|-- customerOrders: array (nullable = true)
| |-- element: map (containsNull = true)
| | |-- key: string
| | |-- value: string (valueContainsNull = true)
|-- endTimeEpoch: long (nullable = true)
|-- startTimeEpoch: long (nullable = true)
|-- version: string (nullable = true)
|-- waste: map (nullable = true)
| |-- key: string
| |-- value: double (valueContainsNull = true)
Now the issue with my current approach is that structured fields inside the customer orders get aggregated accross the customer orders like this:
[
Row(
customer_orders=[
Row(
area_id='056384e1-56a0-49f9-b9fa-95dd29477ce3',
start_time_epoch=1666186174621,
customer_order_index=[0, 1],
customer_order_identifier=[
'OrderIdentifier_0',
'OrderIdentifier_1',
],
nominal_cuts=[751400596, 1502801192],
cuts=[949425219, 1898850438],
automatic_cut_control=[True, False],
)
]
)
]
This is my approach so far:
demo_d.select(
array(
[
struct(
col('areaId').alias('area_id'),
col('startTimeEpoch').alias('start_time_epoch'),
col('customerOrders.index').alias('customer_order_index'),
col('customerOrders.identifier').alias(
'customer_order_identifier'
),
col('customerOrders.cuts.nominal').alias('nominal_cuts'),
col('customerOrders.cuts.actual').alias('cuts'),
col('customerOrders.automaticCutControl').alias(
'automatic_cut_control'
),
)
]
).alias('customer_orders')
).collect()
What is it that I need to do differently?
Share Improve this question asked Mar 20 at 16:28 pmaier-bhspmaier-bhs 13 bronze badges 1- 1 check transform function, no need to do explode + groupby. – lihao Commented Mar 21 at 12:43
1 Answer
Reset to default 0I found this solution:
exploded_df = exploded_df.select(
array(
struct(
col('areaId').alias('area_id'),
col('startTimeEpoch').alias('start_time_epoch'),
col('customer_order.index').alias('customer_order_index'),
col('customer_order.identifier').alias(
'customer_order_identifier'
),
col('customer_order.cuts.nominal').alias('nominal_cuts'),
col('customer_order.cuts.actual').alias('cuts'),
col('customer_order.automaticCutControl').alias(
'automatic_cut_control'
),
)
).alias('customer_orders')
)
agg_customers_df = exploded_df.groupBy(
'customer_orders.area_id', 'customer_orders.start_time_epoch'
).agg(collect_list('customer_orders').alias('customer_orders'))
columns_to_transform = ['area_id', 'start_time_epoch']
for column in columns_to_transform:
agg_customers_df = agg_customers_df.withColumn(
column, col(column).getItem(0)
)
final_df = agg_customers_df
final_df.collect()