I have a large JSON that I do not want to load into memory. I would like to validate it against a JSON schema in a streaming fashion. All libraries I could find so far, only validate completely loaded JSON objects (like Pydantic or ). What I rather need is some way to validate it feeding the original JSON chunk by chunk, i.e, control the size of the buffer.
This could look like this:
import pydantic # I use V2
import ijson
import pathlib
class A(pydantic.BaseModel):
i: int
a: list[int]
s: str
jsonpath = pathlib.Path("some.json")
validator = MyValidator(schema=A.model_json_schema())
with jsonpath.open("rb") as file:
for prefix, event, value in ijson.parse(file, use_float=True):
validator.event((prefix, event, value))
print(validator.errors)
Imagine some.json
file is ~50 MB large A
instance with a very long array. I do not want to load the whole object into memory (this is what Pydantic would do), but I want to make sure that some.json
complies with the schema of A
. The validator.errors
could give me a list of errors which is going to be empty in cases none where discovered.
[EDIT 2025-01-31] The term "streaming fashion" means that event is seen exactly once and there is no way to see it again. However, I am willing to accept an answer if there is a way to do the validation with multiple scans, i.e., in my example above multiple file scans are fine with me.
I have a large JSON that I do not want to load into memory. I would like to validate it against a JSON schema in a streaming fashion. All libraries I could find so far, only validate completely loaded JSON objects (like Pydantic or https://github.com/python-jsonschema/jsonschema). What I rather need is some way to validate it feeding the original JSON chunk by chunk, i.e, control the size of the buffer.
This could look like this:
import pydantic # I use V2
import ijson
import pathlib
class A(pydantic.BaseModel):
i: int
a: list[int]
s: str
jsonpath = pathlib.Path("some.json")
validator = MyValidator(schema=A.model_json_schema())
with jsonpath.open("rb") as file:
for prefix, event, value in ijson.parse(file, use_float=True):
validator.event((prefix, event, value))
print(validator.errors)
Imagine some.json
file is ~50 MB large A
instance with a very long array. I do not want to load the whole object into memory (this is what Pydantic would do), but I want to make sure that some.json
complies with the schema of A
. The validator.errors
could give me a list of errors which is going to be empty in cases none where discovered.
[EDIT 2025-01-31] The term "streaming fashion" means that event is seen exactly once and there is no way to see it again. However, I am willing to accept an answer if there is a way to do the validation with multiple scans, i.e., in my example above multiple file scans are fine with me.
Share Improve this question edited Feb 3 at 19:22 Anton Daneyko asked Jan 26 at 18:17 Anton DaneykoAnton Daneyko 6,5037 gold badges35 silver badges63 bronze badges 20- I believe people who are voting to close this question do not understand the question. This is not about recommending the best library for a job, it is about if and how something can be done in the first place. – ypnos Commented Jan 26 at 22:23
- @ypnos Thank you for defeding my question. I may make some edits to emphasize your point. – Anton Daneyko Commented Jan 28 at 16:16
- 1 @mb21 It seems the libs you have mentioned do not validate against a JSON schema, but just check that the JSON is well-fromed. I am willing to give the bounty to someone who suggests even a non-Python solution, but something that allows for an "easy" creation of Python bindings. – Anton Daneyko Commented Jan 29 at 17:24
- 1 Instead of asking to validate an arbitary schema (which is not realistic), you should focus on your specific schema instead. So I suggest providing it. – Olivier Commented Jan 30 at 11:18
- 1 You will find much more detailed explanations by interacting with the json-schema.slack.com where we have a whole host of experienced users and implementers available to describe intimate details of the spec and the behaviors – Jeremy Fiel Commented Jan 31 at 22:58
7 Answers
Reset to default 3 +200You're on the right track using ijson
for streaming, but the issue is that pydantic
expects the entire object at once, while ijson
parses in a streaming manner. To validate incrementally without loading the entire JSON into memory, you can:
- Use a custom validator that incrementally checks each field as it arrives instead of waiting for the full object.
- Validate the
a
list items one by one instead of collecting them all in memory.
Instead of passing the entire JSON object to Pydantic
at once, parse the JSON step-by-step and validate in parts.
import pydantic # Pydantic V2
import ijson
import pathlib
class A(pydantic.BaseModel):
i: int
a: list[int] = []
s: str
jsonpath = pathlib.Path("some.json")
errors = []
partial_data = {"i": None, "a": [], "s": None}
with jsonpath.open("rb") as file:
for prefix, event, value in ijson.parse(file, use_float=True):
if prefix == "i" and event == "number":
partial_data["i"] = value
elif prefix == "s" and event == "string":
partial_data["s"] = value
elif prefix.startswith("a.item") and event in {"number", "integer"}:
try:
# Validate individual array elements as they arrive
int_value = int(value)
A.model_validate({"a": [int_value]}, strict=True)
partial_data["a"].append(int_value)
except pydantic.ValidationError as e:
errors.append(f"Error in 'a': {e.errors()}")
try:
A.model_validate(partial_data, strict=True)
except pydantic.ValidationError as e:
errors.append(e.errors())
print(errors if errors else "Validation passed")
This is the JSON Schema of some.json.
{
"type": "object",
"properties": {
"id": {"type": "integer"},
"name": {"type": "string"},
"data": {
"type": "array",
"items": {"type": "integer"}
}
},
"required": ["id", "name", "data"]
}
Pydantic comes with an experimental feature called "partial validation" that is designed for stream inputs.
See https://docs.pydantic.dev/latest/concepts/experimental/#partial-validation
You can create a Pydantic model from an existing JSON schema using datamodel-code-generator: https://koxudaxi.github.io/datamodel-code-generator/
Open issues I see right now with this method:
- Support is limited to specific types and the root must be a TypeAdapter instead of a BaseModel
- Unclear how to proceed after the inital validation step, with consecutive incoming data
I think we're locked in to a semi-manual approach to validation.
Handling lists are something of a nightmare, but for some basic test data the below works (except the dict
handler, I didn't test that due to time constraints). Event handling looks very simple if you don't account for lists, and you'll see below that 4/5 of the code is there to account for how ijson
emits events for lists.
# validator.py
import ijson
from typing import Type, Any, Set, get_type_hints
from pydantic import BaseModel
class StreamingJsonValidator:
def __init__(self, model_class: Type[BaseModel]):
"""
Initialize with a Pydantic model (not instance)
"""
self.model_class = model_class
self.field_types = get_type_hints(model_class)
self.required_fields = {
field_name for field_name, field in model_class.model_fields.items()
if field.is_required
}
def _validate_type(self, value: Any, expected_type: Type) -> bool:
"""
Validate a value against the expected type
"""
# Basic types
if expected_type in (str, int, float, bool):
return isinstance(value, expected_type)
# Lists
if hasattr(expected_type, "__origin__") and expected_type.__origin__ is list:
if not isinstance(value, list):
return False
item_type = expected_type.__args__[0]
return all(self._validate_type(item, item_type) for item in value)
# Dictionaries
if hasattr(expected_type, "__origin__") and expected_type.__origin__ is dict:
if not isinstance(value, dict):
return False
key_type, value_type = expected_type.__args__
return all(
self._validate_type(k, key_type) and self._validate_type(v, value_type)
for k, v in value.items()
)
return False
def validate_file(self, file_path: str) -> tuple[bool, list[str]]:
"""
Validate a JSON file
"""
seen_fields: Set[str] = set()
errors: list[str] = []
current_field = None
current_array = []
in_array = False
try:
with open(file_path, 'rb') as file:
parser = ijson.parse(file)
for prefix, event, value in parser:
# New field
if event == 'map_key':
# Track list progress
if in_array and current_field:
expected_type = self.field_types[current_field]
if not self._validate_type(current_array, expected_type):
errors.append(f"Invalid type for {current_field}: expected {expected_type}, got array with invalid items")
seen_fields.add(current_field)
current_array = []
in_array = False
current_field = value
continue
# Detect start of lists
if current_field and event == 'start_array':
in_array = True
current_array = []
continue
if current_field and in_array and event not in ('start_array', 'end_array'):
current_array.append(value)
continue
# Close list
if current_field and event == 'end_array':
if current_field not in self.field_types:
errors.append(f"Unknown field: {current_field}")
elif current_field in seen_fields:
errors.append(f"Duplicate field: {current_field}")
else:
expected_type = self.field_types[current_field]
if not self._validate_type(current_array, expected_type):
errors.append(f"Invalid type for {current_field}: expected {expected_type}, got array with invalid items")
seen_fields.add(current_field)
current_array = []
in_array = False
current_field = None
continue
# Detect if we're looking at a complete key-value pair - necessary for list (and possibly dict) handling
if current_field and not in_array and event in ('number', 'string', 'boolean', 'null'):
if current_field not in self.field_types:
errors.append(f"Unknown field: {current_field}")
elif current_field in seen_fields:
errors.append(f"Duplicate field: {current_field}")
else:
expected_type = self.field_types[current_field]
if not self._validate_type(value, expected_type):
errors.append(f"Invalid type for {current_field}: expected {expected_type}, got {type(value)}")
seen_fields.add(current_field)
current_field = None
missing_fields = self.required_fields - seen_fields
if missing_fields:
errors.append(f"Missing required fields: {missing_fields}")
except Exception as e:
errors.append(f"Error parsing JSON: {type(e).__class__.__name__} - {str(e)}")
if len(errors) != 0:
is_valid = False
else:
is_valid = True
return is_valid, errors
# test.py
from pydantic import BaseModel
from .validator import StreamingJsonValidator
class A(BaseModel):
i: int
a: list[int] = []
s: str
validator = StreamingJsonValidator(A)
# valid case
is_valid, errors = validator.validate_file('valid.json')
print("Expecting a valid result")
if is_valid:
print("JSON is valid!")
else:
print("Validation errors:")
for error in errors:
print(f"- {error}")
# invalid case
is_valid, errors = validator.validate_file('invalid.json')
print("Expecting an invalid result")
if is_valid:
print("JSON is valid!")
else:
print("Validation errors:")
for error in errors:
print(f"- {error}")
# valid.json
{
"i": 42,
"a": [1, 2, 3],
"s": "hello"
}
# invalid.json
{
"i": 42,
"a": [1, "2", 3],
"s": "hello"
}
- Dynamically parses fields without hardcoding each attribute.
- Validates incrementally for large lists and nested structures.
- Supports any Pydantic model.
This works for any Pydantic model and does not require manually handling fields.
import pydantic
import ijson
import pathlib
from typing import Type, Any
class StreamingValidator:
def __init__(self, model: Type[pydantic.BaseModel]):
self.model = model
self.partial_data = {field: [] if field_info.annotation == list else None
for field, field_info in model.model_fields.items()}
self.errors = []
def process_event(self, prefix: str, event: str, value: Any):
"""
Process each streaming event and store values incrementally.
"""
field_name = prefix.split(".")[0] # Extract root field name
if field_name in self.partial_data:
field_type = self.model.model_fields[field_name].annotation
# Handle lists incrementally
if isinstance(self.partial_data[field_name], list):
try:
validated_value = pydantic.TypeAdapter(field_type).validate_python(value)
self.partial_data[field_name].append(validated_value)
except pydantic.ValidationError as e:
self.errors.append(f"Error in '{field_name}': {e.errors()}")
else:
self.partial_data[field_name] = value
def validate(self):
"""
Validate the final accumulated data against the full Pydantic model.
"""
try:
self.model.model_validate(self.partial_data, strict=True)
except pydantic.ValidationError as e:
self.errors.append(e.errors())
def get_errors(self):
return self.errors
# Example Pydantic Model
class A(pydantic.BaseModel):
i: int
a: list[int]
s: str
# Generic streaming validation
jsonpath = pathlib.Path("some.json")
validator = StreamingValidator(A)
with jsonpath.open("rb") as file:
for prefix, event, value in ijson.parse(file, use_float=True):
validator.process_event(prefix, event, value)
validator.validate()
print(validator.get_errors() if validator.get_errors() else "Validation passed")
Why This Works for Any Model
- Automatic Parsing: Extracts field names dynamically from the Pydantic model.
- Handles Large Lists Efficiently: Validates list elements incrementally.
- Generic for Any Model: No need to manually parse fields for each model.
- Strict Validation: Uses model_validate() for full schema enforcement.
import pydantic
import ijson
import pathlib
from typing import Type, Any
class StreamingValidator:
def __init__(self, model: Type[pydantic.BaseModel]):
self.model = model
self.errors = []
def validate_event(self, prefix: str, event: str, value: Any):
"""
Validate each JSON event as it arrives.
"""
field_name = prefix.split(".")[0] # Extract top-level field name
if field_name in self.model.model_fields:
field_info = self.model.model_fields[field_name]
field_type = field_info.annotation
try:
if isinstance(field_type, type) and issubclass(field_type, list):
# Validate list items individually
pydantic.TypeAdapter(field_type.__args__[0]).validate_python(value)
else:
# Validate single field values
pydantic.TypeAdapter(field_type).validate_python(value)
except pydantic.ValidationError as e:
self.errors.append(f"Error in '{field_name}': {e.errors()}")
def get_errors(self):
return self.errors
# Example Pydantic Model
class A(pydantic.BaseModel):
i: int
a: list[int]
s: str
# Generic streaming validation
jsonpath = pathlib.Path("some.json")
validator = StreamingValidator(A)
with jsonpath.open("rb") as file:
for prefix, event, value in ijson.parse(file, use_float=True):
validator.validate_event(prefix, event, value)
print(validator.get_errors() if validator.get_errors() else "Validation passed")
Using ijson for Incremental Parsing: You can process the file in a memory-efficient way by using the ijson package, which enables iterative parsing of JSON data. Although schema validation is not integrated into iJSON, you can add your own validation logic while parsing. Each parsed element must be explicitly compared to the expected schema using this method.
Use pandas to read the json file and you can chunksize param in the pd.read_json(). This was you are only loading the few records (in chunks) into memory.
import pandas as pd
# Read the JSON file in chunks
for chunk in pd.read_json(input_file, chunksize=chunk_size, lines=True):
chunk_results = await process_chunk(chunk)
Link to the doc: read_json doc