Skip to content

Commit

Permalink
upgrade snappy framing format
Browse files Browse the repository at this point in the history
  • Loading branch information
jtolio committed Feb 9, 2013
1 parent e898f76 commit 50ea5ab
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 31 deletions.
28 changes: 17 additions & 11 deletions snappy.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
compress, decompress, isValidCompressed, uncompress, \
_crc32c

_CHUNK_MAX = 32768
_CHUNK_MAX = 65536
_STREAM_TO_STREAM_BLOCK_SIZE = _CHUNK_MAX
_STREAM_IDENTIFIER = b"sNaPpY"
_COMPRESSED_CHUNK = 0x00
Expand Down Expand Up @@ -104,8 +104,8 @@ def add_chunk(self, data, compress=None):
"""
if not self._header_chunk_written:
self._header_chunk_written = True
out = [struct.pack("<BH", _IDENTIFIER_CHUNK,
len(_STREAM_IDENTIFIER)),
out = [struct.pack("<B", _IDENTIFIER_CHUNK),
struct.pack("<L", len(_STREAM_IDENTIFIER))[:-1],
_STREAM_IDENTIFIER]
else:
out = []
Expand All @@ -126,8 +126,10 @@ def add_chunk(self, data, compress=None):
chunk_type = _COMPRESSED_CHUNK
else:
chunk_type = _UNCOMPRESSED_CHUNK
out.append(struct.pack("<BHL", chunk_type, len(chunk) + 4, crc))
out.append(chunk)
out.extend([struct.pack("<B", chunk_type),
struct.pack("<L", len(chunk) + 4)[:-1],
struct.pack("<L", crc),
chunk])
return b"".join(out)

def compress(self, data):
Expand Down Expand Up @@ -180,25 +182,29 @@ def decompress(self, data):
self._buf += data
uncompressed = []
while True:
if len(self._buf) < 3:
if len(self._buf) < 4:
return b"".join(uncompressed)
chunk_type, size = struct.unpack("<BH", self._buf[:3])
chunk_type = struct.unpack("<B", self._buf[:1])[0]
size = struct.unpack("<L", self._buf[1:4] + "\0")[0]
if not self._header_found:
if (chunk_type != _IDENTIFIER_CHUNK and
if (chunk_type != _IDENTIFIER_CHUNK or
size != len(_STREAM_IDENTIFIER)):
raise UncompressError("stream missing snappy identifier")
self._header_found = True
if (_RESERVED_UNSKIPPABLE[0] <= chunk_type and
chunk_type < _RESERVED_UNSKIPPABLE[1]):
raise UncompressError(
"stream received unskippable but unknown chunk")
if len(self._buf) < 3 + size:
if len(self._buf) < 4 + size:
return b"".join(uncompressed)
chunk, self._buf = self._buf[3:3 + size], self._buf[3 + size:]
chunk, self._buf = self._buf[4:4 + size], self._buf[4 + size:]
if chunk_type == _IDENTIFIER_CHUNK and chunk != _STREAM_IDENTIFIER:
raise UncompressError("stream has invalid snappy identifier")
if (_RESERVED_SKIPPABLE[0] <= chunk_type and
chunk_type < _RESERVED_SKIPPABLE[1]):
continue
assert chunk_type in (_COMPRESSED_CHUNK, _UNCOMPRESSED_CHUNK)
if chunk_type not in (_COMPRESSED_CHUNK, _UNCOMPRESSED_CHUNK):
raise UncompressError("internal error")
crc, chunk = chunk[:4], chunk[4:]
if chunk_type == _COMPRESSED_CHUNK:
chunk = _uncompress(chunk)
Expand Down
42 changes: 22 additions & 20 deletions test_snappy.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,12 @@ def test_random(self):
data = b""
compressed = b""
for _ in range(random.randint(0, 3)):
chunk = os.urandom(random.randint(0, 65536))
chunk = os.urandom(random.randint(0, snappy._CHUNK_MAX * 2))
data += chunk
compressed += compressor.add_chunk(
chunk, compress=random.choice([True, False, None]))

upper_bound = random.choice([256, 65536])
upper_bound = random.choice([256, snappy._CHUNK_MAX * 2])
while compressed:
size = random.randint(0, upper_bound)
chunk, compressed = compressed[:size], compressed[size:]
Expand All @@ -124,44 +124,46 @@ def test_compression(self):
self.assertEqual(crc, b"\x8f)H\xbd")
self.assertEqual(len(compressed_data), 6)
self.assertEqual(compressor.add_chunk(data, compress=True),
b"\xff\x06\x00sNaPpY"
b"\x00\x0a\x00" + crc + compressed_data)
b"\xff\x06\x00\x00sNaPpY"
b"\x00\x0a\x00\x00" + crc + compressed_data)

# test that we can add uncompressed chunks
data = b"\x01" * 50
crc = struct.pack("<L", snappy._masked_crc32c(data))
self.assertEqual(crc, b"\xb2\x14)\x8a")
self.assertEqual(compressor.add_chunk(data, compress=False),
b"\x01\x36\x00" + crc + data)
b"\x01\x36\x00\x00" + crc + data)

# test that we can add more data than will fit in one chunk
data = b"\x01" * 65531
crc1 = struct.pack("<L", snappy._masked_crc32c(data[:32768]))
self.assertEqual(crc1, b"g\xc9\t\xea")
crc2 = struct.pack("<L", snappy._masked_crc32c(data[32768:]))
self.assertEqual(crc2, b"\xbb\xe9\xc3k")
data = b"\x01" * (snappy._CHUNK_MAX * 2 - 5)
crc1 = struct.pack("<L",
snappy._masked_crc32c(data[:snappy._CHUNK_MAX]))
self.assertEqual(crc1, b"h#6\x8e")
crc2 = struct.pack("<L",
snappy._masked_crc32c(data[snappy._CHUNK_MAX:]))
self.assertEqual(crc2, b"q\x8foE")
self.assertEqual(compressor.add_chunk(data, compress=False),
b"\x01\x04\x80" + crc1 + data[:32768] +
b"\x01\xff\x7f" + crc2 + data[32768:])
b"\x01\x04\x00\x01" + crc1 + data[:snappy._CHUNK_MAX] +
b"\x01\xff\xff\x00" + crc2 + data[snappy._CHUNK_MAX:])

def test_decompression(self):
# test that we check for the initial stream identifier
data = b"\x01" * 50
self.assertRaises(snappy.UncompressError,
snappy.StreamDecompressor().decompress,
b"\x01\x36\x00" +
b"\x01\x36\x00\00" +
struct.pack("<L", snappy._masked_crc32c(data)) + data)
self.assertEqual(
snappy.StreamDecompressor().decompress(
b"\xff\x06\x00sNaPpY"
b"\x01\x36\x00" +
b"\xff\x06\x00\x00sNaPpY"
b"\x01\x36\x00\x00" +
struct.pack("<L", snappy._masked_crc32c(data)) + data),
data)
decompressor = snappy.StreamDecompressor()
decompressor.decompress(b"\xff\x06\x00sNaPpY")
decompressor.decompress(b"\xff\x06\x00\x00sNaPpY")
self.assertEqual(
decompressor.copy().decompress(
b"\x01\x36\x00" +
b"\x01\x36\x00\x00" +
struct.pack("<L", snappy._masked_crc32c(data)) + data),
data)

Expand All @@ -179,14 +181,14 @@ def test_decompression(self):
fake_crc = os.urandom(4)
self.assertRaises(snappy.UncompressError,
decompressor.copy().decompress,
b"\x00\x0a\x00" + fake_crc + compressed_data)
b"\x00\x0a\x00\x00" + fake_crc + compressed_data)
self.assertEqual(
decompressor.copy().decompress(
b"\x00\x0a\x00" + real_crc + compressed_data),
b"\x00\x0a\x00\x00" + real_crc + compressed_data),
data)

def test_concatenation(self):
data1 = os.urandom(65536)
data1 = os.urandom(snappy._CHUNK_MAX * 2)
data2 = os.urandom(4096)
decompressor = snappy.StreamDecompressor()
self.assertEqual(
Expand Down

0 comments on commit 50ea5ab

Please sign in to comment.