I am attempting to combine multiple gzip
streams into a single stream, my understanding is this should be possible but my implementation is flawed.
Based on what I have read, my expectation was that I should be able remove the 10byte header and 8 byte footer from all streams, concatenate the bytes together and reconstruct a header and footer.
However, when I do try and do this the decompression operation fails, I am assuming this is because the .flush()
is including some information in the block about "end of data" that is not being removed.
Ideal Case
It is possible to concatenate multiple gzip
streams together without altering them. This is a valid gzip
file containing multiple streams.
Unfortunately, when using zlib.decompress(data, GZIP_WBITS)
, rather than using decompressobj
to check for an unconsumed_tail
, only the first stream is returned.
Naïve Concatenation
Example to show how concatenation might break some downstream clients consuming these files.
import zlib
GZIP_WBITS = 16 + zlib.MAX_WBITS
def decompress(data: bytes) -> bytes:
return zlib.decompress(data, GZIP_WBITS)
def compress(data: list[bytes]) -> bytes:
output = b""
for datum in data:
deflate = zlibpressobj(8, zlib.DEFLATED, GZIP_WBITS)
output += deflatepress(datum)
output += deflate.flush()
return output
def test_decompression():
data = [b"Hello", b"World!"]
compressed = compress(data)
decompressed = decompress(compressed)
# this should be b"".join(data) == decompressed
assert decompressed == data[0]
Sample Code (Not working)
import zlib
import struct
test_bytes = b"hello world"
# create an GZIP example stream
deflate = zlibpressobj(8, zlib.DEFLATED, GZIP_WBITS)
single = deflatepress(test_bytes)
single += deflate.flush()
# quick sanity check that decompression works
zlib.decompress(single, GZIP_WBITS)
print("Single:", single.hex())
# check our understanding of the footer is correct
single_len = struct.unpack("<I", single[-4:])[0]
assert single_len == len(test_bytes), "wrong len"
single_crc = struct.unpack("<I", single[-8:-4])[0]
assert single_crc == zlib.crc32(test_bytes), "wrong crc"
# Create an example GZIP stream with duplicated input bytes
deflate = zlibpressobj(8, zlib.DEFLATED, GZIP_WBITS)
double = deflatepress(test_bytes)
double += deflatepress(test_bytes)
double += deflate.flush()
# quick sanity check that decompression works
zlib.decompress(double, GZIP_WBITS)
# Check we can calculate the len and bytes correctly
double_length = struct.unpack("<I", double[-4:])[0]
assert double_length == len(test_bytes + test_bytes), "wrong len"
double_crc = struct.unpack("<I", double[-8:-4])[0]
assert double_crc == zlib.crc32(test_bytes + test_bytes), "wrong crc"
print(f"Double: {double.hex()}")
# Remove the header and footer from our original GZIP stream
single_data = single[10:-8]
print(f" Data: {' '*20}{single_data.hex()}")
# Concatenate the original stream with footer removed with a duplicate
# with the header and footer removed
concatenated = single[:-8] + single_data
# Add the footer, comprising the CRC and Length
concatenated += struct.pack("<I", double_crc)
concatenated += struct.pack("<I", double_length)
assert concatenated .startswith(single[:-8])
print(f" Maybe: {concatenated.hex()}")
# Confirm this is bad data
zlib.decompress(concatenated, GZIP_WBITS)
My assumption is it will be possible to use the following function to combine the crc32 values:
def crc_combine(crcA, crcB, lenB):
crcA0 = zlib.crc32(b'\0' * lenB, crcA ^ 0xffffffff) ^ 0xffffffff
return crcA0 ^ crcB
Requirements
- Pure python with no dependencies (aws lambda runtime where dependency management is a paid)
- Avoid decompressing and recompressing the streams, we have control of the original content and so can calculate the resultant CRC using the
crc32_combine
function. - The resulting file can be decompressed using a single call to
zlib.decompress(data, GZIP_WBITS)
as the resultant files form part of a "public interface" and this would be considered a breaking change.
Sources
- Concatenate multiple zlib compressed data streams into a single stream efficiently
- Combining two non-pure CRC32 values
I am attempting to combine multiple gzip
streams into a single stream, my understanding is this should be possible but my implementation is flawed.
Based on what I have read, my expectation was that I should be able remove the 10byte header and 8 byte footer from all streams, concatenate the bytes together and reconstruct a header and footer.
However, when I do try and do this the decompression operation fails, I am assuming this is because the .flush()
is including some information in the block about "end of data" that is not being removed.
Ideal Case
It is possible to concatenate multiple gzip
streams together without altering them. This is a valid gzip
file containing multiple streams.
Unfortunately, when using zlib.decompress(data, GZIP_WBITS)
, rather than using decompressobj
to check for an unconsumed_tail
, only the first stream is returned.
Naïve Concatenation
Example to show how concatenation might break some downstream clients consuming these files.
import zlib
GZIP_WBITS = 16 + zlib.MAX_WBITS
def decompress(data: bytes) -> bytes:
return zlib.decompress(data, GZIP_WBITS)
def compress(data: list[bytes]) -> bytes:
output = b""
for datum in data:
deflate = zlib.compressobj(8, zlib.DEFLATED, GZIP_WBITS)
output += deflate.compress(datum)
output += deflate.flush()
return output
def test_decompression():
data = [b"Hello", b"World!"]
compressed = compress(data)
decompressed = decompress(compressed)
# this should be b"".join(data) == decompressed
assert decompressed == data[0]
Sample Code (Not working)
import zlib
import struct
test_bytes = b"hello world"
# create an GZIP example stream
deflate = zlib.compressobj(8, zlib.DEFLATED, GZIP_WBITS)
single = deflate.compress(test_bytes)
single += deflate.flush()
# quick sanity check that decompression works
zlib.decompress(single, GZIP_WBITS)
print("Single:", single.hex())
# check our understanding of the footer is correct
single_len = struct.unpack("<I", single[-4:])[0]
assert single_len == len(test_bytes), "wrong len"
single_crc = struct.unpack("<I", single[-8:-4])[0]
assert single_crc == zlib.crc32(test_bytes), "wrong crc"
# Create an example GZIP stream with duplicated input bytes
deflate = zlib.compressobj(8, zlib.DEFLATED, GZIP_WBITS)
double = deflate.compress(test_bytes)
double += deflate.compress(test_bytes)
double += deflate.flush()
# quick sanity check that decompression works
zlib.decompress(double, GZIP_WBITS)
# Check we can calculate the len and bytes correctly
double_length = struct.unpack("<I", double[-4:])[0]
assert double_length == len(test_bytes + test_bytes), "wrong len"
double_crc = struct.unpack("<I", double[-8:-4])[0]
assert double_crc == zlib.crc32(test_bytes + test_bytes), "wrong crc"
print(f"Double: {double.hex()}")
# Remove the header and footer from our original GZIP stream
single_data = single[10:-8]
print(f" Data: {' '*20}{single_data.hex()}")
# Concatenate the original stream with footer removed with a duplicate
# with the header and footer removed
concatenated = single[:-8] + single_data
# Add the footer, comprising the CRC and Length
concatenated += struct.pack("<I", double_crc)
concatenated += struct.pack("<I", double_length)
assert concatenated .startswith(single[:-8])
print(f" Maybe: {concatenated.hex()}")
# Confirm this is bad data
zlib.decompress(concatenated, GZIP_WBITS)
My assumption is it will be possible to use the following function to combine the crc32 values:
def crc_combine(crcA, crcB, lenB):
crcA0 = zlib.crc32(b'\0' * lenB, crcA ^ 0xffffffff) ^ 0xffffffff
return crcA0 ^ crcB
Requirements
- Pure python with no dependencies (aws lambda runtime where dependency management is a paid)
- Avoid decompressing and recompressing the streams, we have control of the original content and so can calculate the resultant CRC using the
crc32_combine
function. - The resulting file can be decompressed using a single call to
zlib.decompress(data, GZIP_WBITS)
as the resultant files form part of a "public interface" and this would be considered a breaking change.
Sources
- Concatenate multiple zlib compressed data streams into a single stream efficiently
- Combining two non-pure CRC32 values
- 1 No, you can't just concatenate deflate streams to make a deflate stream. That requires more shenanigans, which can be found in gzjoin. You need to find the last-block bit in the last block of the first deflate stream, clear it, bring that stream to a byte boundary with a stored block, and then you can concatenate the next deflate stream. – Mark Adler Commented Feb 6 at 20:24
- Thanks @MarkAdler, this put me on the right track! – Ben Carley Commented Feb 7 at 10:39
2 Answers
Reset to default 4This is much easier than you're making it out to be. Simply concatenate the gzip files without removing or in any way messing with the headers and trailers. Any concatenation of gzip streams is a valid gzip stream, and will decompress to the concatenation of the uncompressed contents of the individual gzip streams.
Based on the clarification from @MarkAdler about the last-block bit and the byte boundary (and his many other supporting answers) I have stitched together this relatively simple version.
As I understand it, this approach can work as I have control over the creation of the original gzip
streams. Otherwise, I would have to go twiddling bits like gzjoin does.
Working Solution
import struct
import zlib
from typing import TypedDict
GZIP_WBITS = 16 + zlib.MAX_WBITS
class PrepareRet(TypedDict):
data: bytes
length: int
crc: int
def decompress(data: bytes) -> bytes:
return zlib.decompress(data, GZIP_WBITS)
def prepare(data: bytes) -> PrepareRet:
deflate = zlib.compressobj(8, zlib.DEFLATED, GZIP_WBITS)
output = deflate.compress(data)
# use Z_SYNC_FLUSH so the streams can be concatenated
output += deflate.flush(zlib.Z_SYNC_FLUSH)
return {
"data": output,
"length": len(data),
"crc": zlib.crc32(data),
}
def crc_combine(crcA: int, crcB: int, lenB: int):
crcA0 = zlib.crc32(b"\0" * lenB, crcA ^ 0xFFFFFFFF) ^ 0xFFFFFFFF
return crcA0 ^ crcB
def concatenate(streams: list[PrepareRet]) -> bytes:
output = b""
total_length = 0
combined_crc = 0
output = bytearray()
for stream in streams:
# remove the 10byte gzip header
output += stream["data"][10:]
total_length += stream["length"]
combined_crc = crc_combine(combined_crc, stream["crc"], stream["length"])
# Create an empty block to use the header and trailer
compressor = zlib.compressobj(8, zlib.DEFLATED, GZIP_WBITS)
empty = compressor.flush()
# Get the gzip header bytes
gzip_header = empty[:10]
# Create the Z_FINISH trailer
trailer = empty[10:-8]
trailer += struct.pack("<L", combined_crc)
trailer += struct.pack("<L", total_length)
return gzip_header + output + trailer
def test_equal():
data = [b"Hello", b"World!"]
streams = [prepare(datum) for datum in data]
compressed = concatenate(streams)
assert b"".join(data) == decompress(compressed)