最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

How to concatenate GZIP streams in python? - Stack Overflow

programmeradmin3浏览0评论

I am attempting to combine multiple gzip streams into a single stream, my understanding is this should be possible but my implementation is flawed.

Based on what I have read, my expectation was that I should be able remove the 10byte header and 8 byte footer from all streams, concatenate the bytes together and reconstruct a header and footer.

However, when I do try and do this the decompression operation fails, I am assuming this is because the .flush() is including some information in the block about "end of data" that is not being removed.

Ideal Case

It is possible to concatenate multiple gzip streams together without altering them. This is a valid gzip file containing multiple streams.

Unfortunately, when using zlib.decompress(data, GZIP_WBITS), rather than using decompressobj to check for an unconsumed_tail, only the first stream is returned.

Naïve Concatenation

Example to show how concatenation might break some downstream clients consuming these files.

import zlib

GZIP_WBITS = 16 + zlib.MAX_WBITS

def decompress(data: bytes) -> bytes:
    return zlib.decompress(data, GZIP_WBITS)


def compress(data: list[bytes]) -> bytes:
    output = b""
    for datum in data:
        deflate = zlibpressobj(8, zlib.DEFLATED, GZIP_WBITS)
        output += deflatepress(datum)
        output += deflate.flush()
    return output


def test_decompression():
    data = [b"Hello", b"World!"]
    compressed = compress(data)
    decompressed = decompress(compressed)
    # this should be b"".join(data) == decompressed
    assert decompressed == data[0]

Sample Code (Not working)

import zlib
import struct
                                                                                                                                                                                                                                                                                                                                                                                                                                                                          test_bytes = b"hello world"
# create an GZIP example stream
deflate = zlibpressobj(8, zlib.DEFLATED, GZIP_WBITS)
single = deflatepress(test_bytes)
single += deflate.flush()
# quick sanity check that decompression works
zlib.decompress(single, GZIP_WBITS)
print("Single:", single.hex())

# check our understanding of the footer is correct
single_len = struct.unpack("<I", single[-4:])[0]
assert single_len == len(test_bytes), "wrong len"
single_crc = struct.unpack("<I", single[-8:-4])[0]
assert single_crc == zlib.crc32(test_bytes), "wrong crc"

# Create an example GZIP stream with duplicated input bytes
deflate = zlibpressobj(8, zlib.DEFLATED, GZIP_WBITS)
double = deflatepress(test_bytes)
double += deflatepress(test_bytes)
double += deflate.flush()
# quick sanity check that decompression works
zlib.decompress(double, GZIP_WBITS)

# Check we can calculate the len and bytes correctly
double_length = struct.unpack("<I", double[-4:])[0]
assert double_length == len(test_bytes + test_bytes), "wrong len"
double_crc = struct.unpack("<I", double[-8:-4])[0]
assert double_crc == zlib.crc32(test_bytes + test_bytes), "wrong crc"
print(f"Double: {double.hex()}")

# Remove the header and footer from our original GZIP stream
single_data = single[10:-8]
print(f"  Data: {' '*20}{single_data.hex()}")

# Concatenate the original stream with footer removed with a duplicate
# with the header and footer removed
concatenated = single[:-8] + single_data

# Add the footer, comprising the CRC and Length
concatenated += struct.pack("<I", double_crc)
concatenated += struct.pack("<I", double_length)
assert concatenated .startswith(single[:-8])
print(f" Maybe: {concatenated.hex()}")

# Confirm this is bad data
zlib.decompress(concatenated, GZIP_WBITS)                                                                                                                                                                                                                                                                                                                                                                                                                                                

My assumption is it will be possible to use the following function to combine the crc32 values:

def crc_combine(crcA, crcB, lenB):
     crcA0 = zlib.crc32(b'\0' * lenB, crcA ^ 0xffffffff) ^ 0xffffffff
     return crcA0 ^ crcB

Requirements

  • Pure python with no dependencies (aws lambda runtime where dependency management is a paid)
  • Avoid decompressing and recompressing the streams, we have control of the original content and so can calculate the resultant CRC using the crc32_combine function.
  • The resulting file can be decompressed using a single call to zlib.decompress(data, GZIP_WBITS) as the resultant files form part of a "public interface" and this would be considered a breaking change.

Sources

  • Concatenate multiple zlib compressed data streams into a single stream efficiently
  • Combining two non-pure CRC32 values

I am attempting to combine multiple gzip streams into a single stream, my understanding is this should be possible but my implementation is flawed.

Based on what I have read, my expectation was that I should be able remove the 10byte header and 8 byte footer from all streams, concatenate the bytes together and reconstruct a header and footer.

However, when I do try and do this the decompression operation fails, I am assuming this is because the .flush() is including some information in the block about "end of data" that is not being removed.

Ideal Case

It is possible to concatenate multiple gzip streams together without altering them. This is a valid gzip file containing multiple streams.

Unfortunately, when using zlib.decompress(data, GZIP_WBITS), rather than using decompressobj to check for an unconsumed_tail, only the first stream is returned.

Naïve Concatenation

Example to show how concatenation might break some downstream clients consuming these files.

import zlib

GZIP_WBITS = 16 + zlib.MAX_WBITS

def decompress(data: bytes) -> bytes:
    return zlib.decompress(data, GZIP_WBITS)


def compress(data: list[bytes]) -> bytes:
    output = b""
    for datum in data:
        deflate = zlib.compressobj(8, zlib.DEFLATED, GZIP_WBITS)
        output += deflate.compress(datum)
        output += deflate.flush()
    return output


def test_decompression():
    data = [b"Hello", b"World!"]
    compressed = compress(data)
    decompressed = decompress(compressed)
    # this should be b"".join(data) == decompressed
    assert decompressed == data[0]

Sample Code (Not working)

import zlib
import struct
                                                                                                                                                                                                                                                                                                                                                                                                                                                                          test_bytes = b"hello world"
# create an GZIP example stream
deflate = zlib.compressobj(8, zlib.DEFLATED, GZIP_WBITS)
single = deflate.compress(test_bytes)
single += deflate.flush()
# quick sanity check that decompression works
zlib.decompress(single, GZIP_WBITS)
print("Single:", single.hex())

# check our understanding of the footer is correct
single_len = struct.unpack("<I", single[-4:])[0]
assert single_len == len(test_bytes), "wrong len"
single_crc = struct.unpack("<I", single[-8:-4])[0]
assert single_crc == zlib.crc32(test_bytes), "wrong crc"

# Create an example GZIP stream with duplicated input bytes
deflate = zlib.compressobj(8, zlib.DEFLATED, GZIP_WBITS)
double = deflate.compress(test_bytes)
double += deflate.compress(test_bytes)
double += deflate.flush()
# quick sanity check that decompression works
zlib.decompress(double, GZIP_WBITS)

# Check we can calculate the len and bytes correctly
double_length = struct.unpack("<I", double[-4:])[0]
assert double_length == len(test_bytes + test_bytes), "wrong len"
double_crc = struct.unpack("<I", double[-8:-4])[0]
assert double_crc == zlib.crc32(test_bytes + test_bytes), "wrong crc"
print(f"Double: {double.hex()}")

# Remove the header and footer from our original GZIP stream
single_data = single[10:-8]
print(f"  Data: {' '*20}{single_data.hex()}")

# Concatenate the original stream with footer removed with a duplicate
# with the header and footer removed
concatenated = single[:-8] + single_data

# Add the footer, comprising the CRC and Length
concatenated += struct.pack("<I", double_crc)
concatenated += struct.pack("<I", double_length)
assert concatenated .startswith(single[:-8])
print(f" Maybe: {concatenated.hex()}")

# Confirm this is bad data
zlib.decompress(concatenated, GZIP_WBITS)                                                                                                                                                                                                                                                                                                                                                                                                                                                

My assumption is it will be possible to use the following function to combine the crc32 values:

def crc_combine(crcA, crcB, lenB):
     crcA0 = zlib.crc32(b'\0' * lenB, crcA ^ 0xffffffff) ^ 0xffffffff
     return crcA0 ^ crcB

Requirements

  • Pure python with no dependencies (aws lambda runtime where dependency management is a paid)
  • Avoid decompressing and recompressing the streams, we have control of the original content and so can calculate the resultant CRC using the crc32_combine function.
  • The resulting file can be decompressed using a single call to zlib.decompress(data, GZIP_WBITS) as the resultant files form part of a "public interface" and this would be considered a breaking change.

Sources

  • Concatenate multiple zlib compressed data streams into a single stream efficiently
  • Combining two non-pure CRC32 values
Share Improve this question edited Feb 7 at 10:23 Ben Carley asked Feb 6 at 11:36 Ben CarleyBen Carley 2801 gold badge3 silver badges8 bronze badges 2
  • 1 No, you can't just concatenate deflate streams to make a deflate stream. That requires more shenanigans, which can be found in gzjoin. You need to find the last-block bit in the last block of the first deflate stream, clear it, bring that stream to a byte boundary with a stored block, and then you can concatenate the next deflate stream. – Mark Adler Commented Feb 6 at 20:24
  • Thanks @MarkAdler, this put me on the right track! – Ben Carley Commented Feb 7 at 10:39
Add a comment  | 

2 Answers 2

Reset to default 4

This is much easier than you're making it out to be. Simply concatenate the gzip files without removing or in any way messing with the headers and trailers. Any concatenation of gzip streams is a valid gzip stream, and will decompress to the concatenation of the uncompressed contents of the individual gzip streams.

Based on the clarification from @MarkAdler about the last-block bit and the byte boundary (and his many other supporting answers) I have stitched together this relatively simple version.

As I understand it, this approach can work as I have control over the creation of the original gzip streams. Otherwise, I would have to go twiddling bits like gzjoin does.

Working Solution

import struct
import zlib
from typing import TypedDict

GZIP_WBITS = 16 + zlib.MAX_WBITS


class PrepareRet(TypedDict):
    data: bytes
    length: int
    crc: int


def decompress(data: bytes) -> bytes:
    return zlib.decompress(data, GZIP_WBITS)


def prepare(data: bytes) -> PrepareRet:
    deflate = zlib.compressobj(8, zlib.DEFLATED, GZIP_WBITS)
    output = deflate.compress(data)
    # use Z_SYNC_FLUSH so the streams can be concatenated
    output += deflate.flush(zlib.Z_SYNC_FLUSH)
    return {
        "data": output,
        "length": len(data),
        "crc": zlib.crc32(data),
    }


def crc_combine(crcA: int, crcB: int, lenB: int):
    crcA0 = zlib.crc32(b"\0" * lenB, crcA ^ 0xFFFFFFFF) ^ 0xFFFFFFFF
    return crcA0 ^ crcB


def concatenate(streams: list[PrepareRet]) -> bytes:
    output = b""
    total_length = 0
    combined_crc = 0

    output = bytearray()
    for stream in streams:
        # remove the 10byte gzip header
        output += stream["data"][10:]
        total_length += stream["length"]
        combined_crc = crc_combine(combined_crc, stream["crc"], stream["length"])

    # Create an empty block to use the header and trailer
    compressor = zlib.compressobj(8, zlib.DEFLATED, GZIP_WBITS)
    empty = compressor.flush()

    # Get the gzip header bytes
    gzip_header = empty[:10]

    # Create the Z_FINISH trailer
    trailer = empty[10:-8]
    trailer += struct.pack("<L", combined_crc)
    trailer += struct.pack("<L", total_length)
    return gzip_header + output + trailer


def test_equal():
    data = [b"Hello", b"World!"]
    streams = [prepare(datum) for datum in data]
    compressed = concatenate(streams)
    assert b"".join(data) == decompress(compressed)
发布评论

评论列表(0)

  1. 暂无评论