最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

Using Python how do I validate JSON against a JSON schema in a streaming fashion, e.g., not loading the whole object in memory?

programmeradmin4浏览0评论

I have a large JSON that I do not want to load into memory. I would like to validate it against a JSON schema in a streaming fashion. All libraries I could find so far, only validate completely loaded JSON objects (like Pydantic or ). What I rather need is some way to validate it feeding the original JSON chunk by chunk, i.e, control the size of the buffer.

This could look like this:

import pydantic  # I use V2
import ijson
import pathlib

class A(pydantic.BaseModel):
    i: int
    a: list[int]
    s: str

jsonpath = pathlib.Path("some.json")
validator = MyValidator(schema=A.model_json_schema())
with jsonpath.open("rb") as file:
    for prefix, event, value in ijson.parse(file, use_float=True):
        validator.event((prefix, event, value))

print(validator.errors)

Imagine some.json file is ~50 MB large A instance with a very long array. I do not want to load the whole object into memory (this is what Pydantic would do), but I want to make sure that some.json complies with the schema of A. The validator.errors could give me a list of errors which is going to be empty in cases none where discovered.

[EDIT 2025-01-31] The term "streaming fashion" means that event is seen exactly once and there is no way to see it again. However, I am willing to accept an answer if there is a way to do the validation with multiple scans, i.e., in my example above multiple file scans are fine with me.

I have a large JSON that I do not want to load into memory. I would like to validate it against a JSON schema in a streaming fashion. All libraries I could find so far, only validate completely loaded JSON objects (like Pydantic or https://github.com/python-jsonschema/jsonschema). What I rather need is some way to validate it feeding the original JSON chunk by chunk, i.e, control the size of the buffer.

This could look like this:

import pydantic  # I use V2
import ijson
import pathlib

class A(pydantic.BaseModel):
    i: int
    a: list[int]
    s: str

jsonpath = pathlib.Path("some.json")
validator = MyValidator(schema=A.model_json_schema())
with jsonpath.open("rb") as file:
    for prefix, event, value in ijson.parse(file, use_float=True):
        validator.event((prefix, event, value))

print(validator.errors)

Imagine some.json file is ~50 MB large A instance with a very long array. I do not want to load the whole object into memory (this is what Pydantic would do), but I want to make sure that some.json complies with the schema of A. The validator.errors could give me a list of errors which is going to be empty in cases none where discovered.

[EDIT 2025-01-31] The term "streaming fashion" means that event is seen exactly once and there is no way to see it again. However, I am willing to accept an answer if there is a way to do the validation with multiple scans, i.e., in my example above multiple file scans are fine with me.

Share Improve this question edited Feb 3 at 19:22 Anton Daneyko asked Jan 26 at 18:17 Anton DaneykoAnton Daneyko 6,5037 gold badges35 silver badges63 bronze badges 20
  • I believe people who are voting to close this question do not understand the question. This is not about recommending the best library for a job, it is about if and how something can be done in the first place. – ypnos Commented Jan 26 at 22:23
  • @ypnos Thank you for defeding my question. I may make some edits to emphasize your point. – Anton Daneyko Commented Jan 28 at 16:16
  • 1 @mb21 It seems the libs you have mentioned do not validate against a JSON schema, but just check that the JSON is well-fromed. I am willing to give the bounty to someone who suggests even a non-Python solution, but something that allows for an "easy" creation of Python bindings. – Anton Daneyko Commented Jan 29 at 17:24
  • 1 Instead of asking to validate an arbitary schema (which is not realistic), you should focus on your specific schema instead. So I suggest providing it. – Olivier Commented Jan 30 at 11:18
  • 1 You will find much more detailed explanations by interacting with the json-schema.slack.com where we have a whole host of experienced users and implementers available to describe intimate details of the spec and the behaviors – Jeremy Fiel Commented Jan 31 at 22:58
 |  Show 15 more comments

7 Answers 7

Reset to default 3 +200

You're on the right track using ijson for streaming, but the issue is that pydantic expects the entire object at once, while ijson parses in a streaming manner. To validate incrementally without loading the entire JSON into memory, you can:

  1. Use a custom validator that incrementally checks each field as it arrives instead of waiting for the full object.
  2. Validate the a list items one by one instead of collecting them all in memory.

Instead of passing the entire JSON object to Pydantic at once, parse the JSON step-by-step and validate in parts.

import pydantic  # Pydantic V2
import ijson
import pathlib

class A(pydantic.BaseModel):
    i: int
    a: list[int] = []
    s: str

jsonpath = pathlib.Path("some.json")

errors = []
partial_data = {"i": None, "a": [], "s": None}

with jsonpath.open("rb") as file:
    for prefix, event, value in ijson.parse(file, use_float=True):
        if prefix == "i" and event == "number":
            partial_data["i"] = value
        elif prefix == "s" and event == "string":
            partial_data["s"] = value
        elif prefix.startswith("a.item") and event in {"number", "integer"}:
            try:
                # Validate individual array elements as they arrive
                int_value = int(value)
                A.model_validate({"a": [int_value]}, strict=True)
                partial_data["a"].append(int_value)
            except pydantic.ValidationError as e:
                errors.append(f"Error in 'a': {e.errors()}")

try:
    A.model_validate(partial_data, strict=True)
except pydantic.ValidationError as e:
    errors.append(e.errors())

print(errors if errors else "Validation passed")

This is the JSON Schema of some.json.

{
    "type": "object",
    "properties": {
        "id": {"type": "integer"},
        "name": {"type": "string"},
        "data": {
            "type": "array",
            "items": {"type": "integer"}
        }
    },
    "required": ["id", "name", "data"]
}

Pydantic comes with an experimental feature called "partial validation" that is designed for stream inputs.

See https://docs.pydantic.dev/latest/concepts/experimental/#partial-validation

You can create a Pydantic model from an existing JSON schema using datamodel-code-generator: https://koxudaxi.github.io/datamodel-code-generator/

Open issues I see right now with this method:

  1. Support is limited to specific types and the root must be a TypeAdapter instead of a BaseModel
  2. Unclear how to proceed after the inital validation step, with consecutive incoming data

I think we're locked in to a semi-manual approach to validation.

Handling lists are something of a nightmare, but for some basic test data the below works (except the dict handler, I didn't test that due to time constraints). Event handling looks very simple if you don't account for lists, and you'll see below that 4/5 of the code is there to account for how ijson emits events for lists.

# validator.py

import ijson
from typing import Type, Any, Set, get_type_hints
from pydantic import BaseModel

class StreamingJsonValidator:
    def __init__(self, model_class: Type[BaseModel]):
        """
        Initialize with a Pydantic model (not instance)
        """
        self.model_class = model_class
        self.field_types = get_type_hints(model_class)
        self.required_fields = {
            field_name for field_name, field in model_class.model_fields.items()
            if field.is_required
        }

    def _validate_type(self, value: Any, expected_type: Type) -> bool:
        """
        Validate a value against the expected type
        """
        # Basic types
        if expected_type in (str, int, float, bool):
            return isinstance(value, expected_type)

        # Lists
        if hasattr(expected_type, "__origin__") and expected_type.__origin__ is list:
            if not isinstance(value, list):
                return False
            item_type = expected_type.__args__[0]
            return all(self._validate_type(item, item_type) for item in value)

        # Dictionaries
        if hasattr(expected_type, "__origin__") and expected_type.__origin__ is dict:
            if not isinstance(value, dict):
                return False
            key_type, value_type = expected_type.__args__
            return all(
                self._validate_type(k, key_type) and self._validate_type(v, value_type)
                for k, v in value.items()
            )

        return False

    def validate_file(self, file_path: str) -> tuple[bool, list[str]]:
        """
        Validate a JSON file
        """
        seen_fields: Set[str] = set()
        errors: list[str] = []
        current_field = None
        current_array = []
        in_array = False

        try:
            with open(file_path, 'rb') as file:
                parser = ijson.parse(file)

                for prefix, event, value in parser:
                    # New field
                    if event == 'map_key':

                        # Track list progress
                        if in_array and current_field:
                            expected_type = self.field_types[current_field]
                            if not self._validate_type(current_array, expected_type):
                                errors.append(f"Invalid type for {current_field}: expected {expected_type}, got array with invalid items")
                            seen_fields.add(current_field)
                            current_array = []
                            in_array = False

                        current_field = value
                        continue

                    # Detect start of lists
                    if current_field and event == 'start_array':
                        in_array = True
                        current_array = []
                        continue

                    if current_field and in_array and event not in ('start_array', 'end_array'):
                        current_array.append(value)
                        continue

                    # Close list
                    if current_field and event == 'end_array':
                        if current_field not in self.field_types:
                            errors.append(f"Unknown field: {current_field}")
                        elif current_field in seen_fields:
                            errors.append(f"Duplicate field: {current_field}")
                        else:
                            expected_type = self.field_types[current_field]
                            if not self._validate_type(current_array, expected_type):
                                errors.append(f"Invalid type for {current_field}: expected {expected_type}, got array with invalid items")
                            seen_fields.add(current_field)
                        current_array = []
                        in_array = False
                        current_field = None
                        continue

                    # Detect if we're looking at a complete key-value pair - necessary for list (and possibly dict) handling
                    if current_field and not in_array and event in ('number', 'string', 'boolean', 'null'):
                        if current_field not in self.field_types:
                            errors.append(f"Unknown field: {current_field}")
                        elif current_field in seen_fields:
                            errors.append(f"Duplicate field: {current_field}")
                        else:
                            expected_type = self.field_types[current_field]
                            if not self._validate_type(value, expected_type):
                                errors.append(f"Invalid type for {current_field}: expected {expected_type}, got {type(value)}")
                            seen_fields.add(current_field)
                        current_field = None

                missing_fields = self.required_fields - seen_fields
                if missing_fields:
                    errors.append(f"Missing required fields: {missing_fields}")

        except Exception as e:
            errors.append(f"Error parsing JSON: {type(e).__class__.__name__} - {str(e)}")

        if len(errors) != 0:
            is_valid = False
        else:
            is_valid = True

        return is_valid, errors
# test.py

from pydantic import BaseModel
from .validator import StreamingJsonValidator

class A(BaseModel):
    i: int
    a: list[int] = []
    s: str

validator = StreamingJsonValidator(A)

# valid case
is_valid, errors = validator.validate_file('valid.json')

print("Expecting a valid result")
if is_valid:
    print("JSON is valid!")
else:
    print("Validation errors:")
    for error in errors:
        print(f"- {error}")

# invalid case
is_valid, errors = validator.validate_file('invalid.json')

print("Expecting an invalid result")
if is_valid:
    print("JSON is valid!")
else:
    print("Validation errors:")
    for error in errors:
        print(f"- {error}")
# valid.json

{
    "i": 42,
    "a": [1, 2, 3],
    "s": "hello"
}
# invalid.json

{
    "i": 42,
    "a": [1, "2", 3],
    "s": "hello"
}
  1. Dynamically parses fields without hardcoding each attribute.
  2. Validates incrementally for large lists and nested structures.
  3. Supports any Pydantic model.

This works for any Pydantic model and does not require manually handling fields.

import pydantic
import ijson
import pathlib
from typing import Type, Any

class StreamingValidator:
    def __init__(self, model: Type[pydantic.BaseModel]):
        self.model = model
        self.partial_data = {field: [] if field_info.annotation == list else None 
                             for field, field_info in model.model_fields.items()}
        self.errors = []

    def process_event(self, prefix: str, event: str, value: Any):
        """
        Process each streaming event and store values incrementally.
        """
        field_name = prefix.split(".")[0]  # Extract root field name
        if field_name in self.partial_data:
            field_type = self.model.model_fields[field_name].annotation

            # Handle lists incrementally
            if isinstance(self.partial_data[field_name], list):
                try:
                    validated_value = pydantic.TypeAdapter(field_type).validate_python(value)
                    self.partial_data[field_name].append(validated_value)
                except pydantic.ValidationError as e:
                    self.errors.append(f"Error in '{field_name}': {e.errors()}")
            else:
                self.partial_data[field_name] = value

    def validate(self):
        """
        Validate the final accumulated data against the full Pydantic model.
        """
        try:
            self.model.model_validate(self.partial_data, strict=True)
        except pydantic.ValidationError as e:
            self.errors.append(e.errors())

    def get_errors(self):
        return self.errors


# Example Pydantic Model
class A(pydantic.BaseModel):
    i: int
    a: list[int]
    s: str

# Generic streaming validation
jsonpath = pathlib.Path("some.json")
validator = StreamingValidator(A)

with jsonpath.open("rb") as file:
    for prefix, event, value in ijson.parse(file, use_float=True):
        validator.process_event(prefix, event, value)

validator.validate()
print(validator.get_errors() if validator.get_errors() else "Validation passed")

Why This Works for Any Model

  • Automatic Parsing: Extracts field names dynamically from the Pydantic model.
  • Handles Large Lists Efficiently: Validates list elements incrementally.
  • Generic for Any Model: No need to manually parse fields for each model.
  • Strict Validation: Uses model_validate() for full schema enforcement.
import pydantic
import ijson
import pathlib
from typing import Type, Any

class StreamingValidator:
    def __init__(self, model: Type[pydantic.BaseModel]):
        self.model = model
        self.errors = []

    def validate_event(self, prefix: str, event: str, value: Any):
        """
        Validate each JSON event as it arrives.
        """
        field_name = prefix.split(".")[0]  # Extract top-level field name
        
        if field_name in self.model.model_fields:
            field_info = self.model.model_fields[field_name]
            field_type = field_info.annotation

            try:
                if isinstance(field_type, type) and issubclass(field_type, list):
                    # Validate list items individually
                    pydantic.TypeAdapter(field_type.__args__[0]).validate_python(value)
                else:
                    # Validate single field values
                    pydantic.TypeAdapter(field_type).validate_python(value)
            except pydantic.ValidationError as e:
                self.errors.append(f"Error in '{field_name}': {e.errors()}")

    def get_errors(self):
        return self.errors

# Example Pydantic Model
class A(pydantic.BaseModel):
    i: int
    a: list[int]
    s: str

# Generic streaming validation
jsonpath = pathlib.Path("some.json")
validator = StreamingValidator(A)

with jsonpath.open("rb") as file:
    for prefix, event, value in ijson.parse(file, use_float=True):
        validator.validate_event(prefix, event, value)

print(validator.get_errors() if validator.get_errors() else "Validation passed")

Using ijson for Incremental Parsing: You can process the file in a memory-efficient way by using the ijson package, which enables iterative parsing of JSON data. Although schema validation is not integrated into iJSON, you can add your own validation logic while parsing. Each parsed element must be explicitly compared to the expected schema using this method.

Use pandas to read the json file and you can chunksize param in the pd.read_json(). This was you are only loading the few records (in chunks) into memory.

import pandas as pd
    
# Read the JSON file in chunks
for chunk in pd.read_json(input_file, chunksize=chunk_size, lines=True):
    chunk_results = await process_chunk(chunk)

Link to the doc: read_json doc

发布评论

评论列表(0)

  1. 暂无评论