I have a problem around processing mixed-schema JSON data that I read from an S3 bucket using PySpark. I found a solution for my problem in this question, I can specify an explicitly defined schema when reading the json data from an rdd into a DataFrame:
json_df: DataFrame = spark.read.schema(schema).json(json_rdd)
Since I did not find the solution before I wrote up my problem here I'm posting it for reference:
My S3 bucket contains json data with two different versions. I'm trying to filter and enforce the correct schema for each version using this code:
spark.createDataFrame(
mixed_json_df.filter(col('version') == version) # filter for the specific version
.select(*[col(field.name) for field in schema]) # get rid of fields not in the schema
.rdd, # need an rdd for createDataFrame
schema=schema # use explicit schema
)
Where the schema is:
root
|-- jsonType: string (nullable = true)
|-- version: string (nullable = true)
|-- regionId: string (nullable = true)
|-- mainItems: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- itemId: string (nullable = true)
| | |-- itemName: string (nullable = true)
| | |-- destination: string (nullable = true)
| | |-- costCenter: string (nullable = true)
| | |-- itemIndex: integer (nullable = true)
| | |-- itemNumber: integer (nullable = true)
| | |-- totalCount: integer (nullable = true)
| | |-- segmentCount: integer (nullable = true)
| | |-- plannedSegments: integer (nullable = true)
| | |-- actualSegments: integer (nullable = true)
| | |-- endFlag: integer (nullable = true)
| | |-- itemWidth: double (nullable = true)
| | |-- itemLength: double (nullable = true)
| | |-- metricA: double (nullable = true)
| | |-- metricB: double (nullable = true)
| | |-- metricC: double (nullable = true)
| | |-- maxValue: double (nullable = true)
|-- secondaryItems: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- secondaryId: string (nullable = true)
| | |-- secondaryWidth: double (nullable = true)
| | |-- segmentCount: integer (nullable = true)
|-- startTimestamp: long (nullable = true)
|-- endTimestamp: long (nullable = true)
|-- totalDuration: integer (nullable = true)
|-- eventCount: integer (nullable = true)
|-- eventDuration: integer (nullable = true)
|-- breakCount: integer (nullable = true)
|-- breakDuration: integer (nullable = true)
|-- metricD: double (nullable = true)
|-- metricDWithEvents: double (nullable = true)
|-- metricDWithoutEvents: double (nullable = true)
|-- metricE: double (nullable = true)
|-- metricEWithEvents: double (nullable = true)
|-- metricEWithoutEvents: double (nullable = true)
|-- maxSpeed: double (nullable = true)
|-- averageSpeed: double (nullable = true)
|-- metricF: double (nullable = true)
|-- metricFPercentage: double (nullable = true)
|-- metricG: double (nullable = true)
|-- metricH: double (nullable = true)
|-- metricI: double (nullable = true)
|-- itemLengthMetric: double (nullable = true)
|-- itemWidthMetric: double (nullable = true)
|-- nominalSpeed: double (nullable = true)
|-- valueCount: integer (nullable = true)
|-- maxValueCount: double (nullable = true)
|-- itemDescription: string (nullable = true)
|-- categoryCode: string (nullable = true)
|-- mode: string (nullable = true)
|-- additionalInfo: string (nullable = true)
|-- changeType: string (nullable = true)
However, I'm getting an error that somehow the number of objects and fields don't match:
pyspark.errors.exceptions.base.PySparkValueError: [LENGTH_SHOULD_BE_THE_SAME] obj and fields should be of the same length, got 35 and 17.
This suggests a mismatch between the number of fields in the data and the schema.
I'm also observing that the 35 fields is the correct number of fields in the schema, while the 17 corresponds (incidentally?) to the number of fields in the nested structs under "mainItems".
So I'm guessing the error stems from the sub-structure somewhow being wrongly interpreted, but I can't see past that right now.
A question might be whether I'm correctly handling all nested fields during schema enforcement?
I'm aware of this question, but it's a much simpler case. Right now I don't see how I can apply the solution to my case.
A fallback solution is probably to somehow separate the two versions before creating that mixed dataframe by filtering the rdd. However, this seems to be something that should be solvable on the dataframe level, and moreover I suspect that the latter would faster.
I have a problem around processing mixed-schema JSON data that I read from an S3 bucket using PySpark. I found a solution for my problem in this question, I can specify an explicitly defined schema when reading the json data from an rdd into a DataFrame:
json_df: DataFrame = spark.read.schema(schema).json(json_rdd)
Since I did not find the solution before I wrote up my problem here I'm posting it for reference:
My S3 bucket contains json data with two different versions. I'm trying to filter and enforce the correct schema for each version using this code:
spark.createDataFrame(
mixed_json_df.filter(col('version') == version) # filter for the specific version
.select(*[col(field.name) for field in schema]) # get rid of fields not in the schema
.rdd, # need an rdd for createDataFrame
schema=schema # use explicit schema
)
Where the schema is:
root
|-- jsonType: string (nullable = true)
|-- version: string (nullable = true)
|-- regionId: string (nullable = true)
|-- mainItems: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- itemId: string (nullable = true)
| | |-- itemName: string (nullable = true)
| | |-- destination: string (nullable = true)
| | |-- costCenter: string (nullable = true)
| | |-- itemIndex: integer (nullable = true)
| | |-- itemNumber: integer (nullable = true)
| | |-- totalCount: integer (nullable = true)
| | |-- segmentCount: integer (nullable = true)
| | |-- plannedSegments: integer (nullable = true)
| | |-- actualSegments: integer (nullable = true)
| | |-- endFlag: integer (nullable = true)
| | |-- itemWidth: double (nullable = true)
| | |-- itemLength: double (nullable = true)
| | |-- metricA: double (nullable = true)
| | |-- metricB: double (nullable = true)
| | |-- metricC: double (nullable = true)
| | |-- maxValue: double (nullable = true)
|-- secondaryItems: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- secondaryId: string (nullable = true)
| | |-- secondaryWidth: double (nullable = true)
| | |-- segmentCount: integer (nullable = true)
|-- startTimestamp: long (nullable = true)
|-- endTimestamp: long (nullable = true)
|-- totalDuration: integer (nullable = true)
|-- eventCount: integer (nullable = true)
|-- eventDuration: integer (nullable = true)
|-- breakCount: integer (nullable = true)
|-- breakDuration: integer (nullable = true)
|-- metricD: double (nullable = true)
|-- metricDWithEvents: double (nullable = true)
|-- metricDWithoutEvents: double (nullable = true)
|-- metricE: double (nullable = true)
|-- metricEWithEvents: double (nullable = true)
|-- metricEWithoutEvents: double (nullable = true)
|-- maxSpeed: double (nullable = true)
|-- averageSpeed: double (nullable = true)
|-- metricF: double (nullable = true)
|-- metricFPercentage: double (nullable = true)
|-- metricG: double (nullable = true)
|-- metricH: double (nullable = true)
|-- metricI: double (nullable = true)
|-- itemLengthMetric: double (nullable = true)
|-- itemWidthMetric: double (nullable = true)
|-- nominalSpeed: double (nullable = true)
|-- valueCount: integer (nullable = true)
|-- maxValueCount: double (nullable = true)
|-- itemDescription: string (nullable = true)
|-- categoryCode: string (nullable = true)
|-- mode: string (nullable = true)
|-- additionalInfo: string (nullable = true)
|-- changeType: string (nullable = true)
However, I'm getting an error that somehow the number of objects and fields don't match:
pyspark.errors.exceptions.base.PySparkValueError: [LENGTH_SHOULD_BE_THE_SAME] obj and fields should be of the same length, got 35 and 17.
This suggests a mismatch between the number of fields in the data and the schema.
I'm also observing that the 35 fields is the correct number of fields in the schema, while the 17 corresponds (incidentally?) to the number of fields in the nested structs under "mainItems".
So I'm guessing the error stems from the sub-structure somewhow being wrongly interpreted, but I can't see past that right now.
A question might be whether I'm correctly handling all nested fields during schema enforcement?
I'm aware of this question, but it's a much simpler case. Right now I don't see how I can apply the solution to my case.
A fallback solution is probably to somehow separate the two versions before creating that mixed dataframe by filtering the rdd. However, this seems to be something that should be solvable on the dataframe level, and moreover I suspect that the latter would faster.
Share Improve this question asked 2 days ago pmaier-bhspmaier-bhs 13 bronze badges1 Answer
Reset to default 0Reposting it as an answer: I found a solution for my problem in this question, I can specify an explicitly defined schema when reading the json data from an rdd into a DataFrame:
json_df: DataFrame = spark.read.schema(schema).json(json_rdd)
It seems however that I'm reading the data twice now:
df_1_0_0 = _read_specific_version(json_rdd, '1.0.0', schema_1_0_0)
df_1_1_0 = _read_specific_version(json_rdd, '1.1.0', schema_1_1_0)
def _read_specific_version(json_rdd, version, schema):
json_df: DataFrame = spark.read.schema(schema).json(json_rdd)
return json_df.filter(col('version') == version)
Is there a more efficient way to do this? Like, is this exploiting parallel execution, or do I enforce sequential execution here? Maybe a spark newbie question.