Skip to content

Commit

Permalink
Add tests for _ZlibDecompressor
Browse files Browse the repository at this point in the history
  • Loading branch information
rhpvorderman committed Oct 3, 2022
1 parent 9d60339 commit 63a2623
Showing 1 changed file with 168 additions and 0 deletions.
168 changes: 168 additions & 0 deletions Lib/test/test_zlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -871,6 +871,7 @@ def test_wbits(self):
)
self.assertEqual(expected, actual)


def choose_lines(source, number, seed=None, generator=random):
"""Return a list of number lines randomly chosen from the source"""
if seed is not None:
Expand Down Expand Up @@ -944,6 +945,173 @@ def choose_lines(source, number, seed=None, generator=random):
"""


class ZlibDecompressorTest():
# Test adopted from test_bz2.py
TEXT = HAMLET_SCENE
DATA = zlib.compress(HAMLET_SCENE)
BAD_DATA = b"Not a valid deflate block"
def test_Constructor(self):
self.assertRaises(TypeError, zlib._ZlibDecompressor, 42)

def testDecompress(self):
zlibd = zlib._ZlibDecompressor()
self.assertRaises(TypeError, zlibd.decompress)
text = zlibd.decompress(self.DATA)
self.assertEqual(text, self.TEXT)

def testDecompressChunks10(self):
zlibd = zlib._ZlibDecompressor()
text = b''
n = 0
while True:
str = self.DATA[n*10:(n+1)*10]
if not str:
break
text += zlibd.decompress(str)
n += 1
self.assertEqual(text, self.TEXT)

def testDecompressUnusedData(self):
zlibd = zlib._ZlibDecompressor()
unused_data = b"this is unused data"
text = zlibd.decompress(self.DATA+unused_data)
self.assertEqual(text, self.TEXT)
self.assertEqual(zlibd.unused_data, unused_data)

def testEOFError(self):
zlibd = zlib._ZlibDecompressor()
text = zlibd.decompress(self.DATA)
self.assertRaises(EOFError, zlibd.decompress, b"anything")
self.assertRaises(EOFError, zlibd.decompress, b"")

@support.skip_if_pgo_task
@bigmemtest(size=_4G + 100, memuse=3.3)
def testDecompress4G(self, size):
# "Test zlib._ZlibDecompressor.decompress() with >4GiB input"
blocksize = 10 * 1024 * 1024
block = random.randbytes(blocksize)
try:
data = block * (size // blocksize + 1)
compressed = zlib.compress(data)
zlibd = zlib._ZlibDecompressor()
decompressed = zlibd.decompress(compressed)
self.assertTrue(decompressed == data)
finally:
data = None
compressed = None
decompressed = None

def testPickle(self):
for proto in range(pickle.HIGHEST_PROTOCOL + 1):
with self.assertRaises(TypeError):
pickle.dumps(zlib._ZlibDecompressor(), proto)

def testDecompressorChunksMaxsize(self):
zlibd = zlib._ZlibDecompressor()
max_length = 100
out = []

# Feed some input
len_ = len(self.BIG_DATA) - 64
out.append(zlibd.decompress(self.BIG_DATA[:len_],
max_length=max_length))
self.assertFalse(zlibd.needs_input)
self.assertEqual(len(out[-1]), max_length)

# Retrieve more data without providing more input
out.append(zlibd.decompress(b'', max_length=max_length))
self.assertFalse(zlibd.needs_input)
self.assertEqual(len(out[-1]), max_length)

# Retrieve more data while providing more input
out.append(zlibd.decompress(self.BIG_DATA[len_:],
max_length=max_length))
self.assertLessEqual(len(out[-1]), max_length)

# Retrieve remaining uncompressed data
while not zlibd.eof:
out.append(zlibd.decompress(b'', max_length=max_length))
self.assertLessEqual(len(out[-1]), max_length)

out = b"".join(out)
self.assertEqual(out, self.BIG_TEXT)
self.assertEqual(zlibd.unused_data, b"")

def test_decompressor_inputbuf_1(self):
# Test reusing input buffer after moving existing
# contents to beginning
zlibd = zlib._ZlibDecompressor()
out = []

# Create input buffer and fill it
self.assertEqual(zlibd.decompress(self.DATA[:100],
max_length=0), b'')

# Retrieve some results, freeing capacity at beginning
# of input buffer
out.append(zlibd.decompress(b'', 2))

# Add more data that fits into input buffer after
# moving existing data to beginning
out.append(zlibd.decompress(self.DATA[100:105], 15))

# Decompress rest of data
out.append(zlibd.decompress(self.DATA[105:]))
self.assertEqual(b''.join(out), self.TEXT)

def test_decompressor_inputbuf_2(self):
# Test reusing input buffer by appending data at the
# end right away
zlibd = zlib._ZlibDecompressor()
out = []

# Create input buffer and empty it
self.assertEqual(zlibd.decompress(self.DATA[:200],
max_length=0), b'')
out.append(zlibd.decompress(b''))

# Fill buffer with new data
out.append(zlibd.decompress(self.DATA[200:280], 2))

# Append some more data, not enough to require resize
out.append(zlibd.decompress(self.DATA[280:300], 2))

# Decompress rest of data
out.append(zlibd.decompress(self.DATA[300:]))
self.assertEqual(b''.join(out), self.TEXT)

def test_decompressor_inputbuf_3(self):
# Test reusing input buffer after extending it

zlibd = zlib._ZlibDecompressor()
out = []

# Create almost full input buffer
out.append(zlibd.decompress(self.DATA[:200], 5))

# Add even more data to it, requiring resize
out.append(zlibd.decompress(self.DATA[200:300], 5))

# Decompress rest of data
out.append(zlibd.decompress(self.DATA[300:]))
self.assertEqual(b''.join(out), self.TEXT)

def test_failure(self):
zlibd = zlib._ZlibDecompressor()
self.assertRaises(Exception, zlibd.decompress, self.BAD_DATA * 30)
# Previously, a second call could crash due to internal inconsistency
self.assertRaises(Exception, zlibd.decompress, self.BAD_DATA * 30)

@support.refcount_test
def test_refleaks_in___init__(self):
gettotalrefcount = support.get_attribute(sys, 'gettotalrefcount')
zlibd = zlib._ZlibDecompressor()
refs_before = gettotalrefcount()
for i in range(100):
zlibd.__init__()
self.assertAlmostEqual(gettotalrefcount() - refs_before, 0, delta=10)


class CustomInt:
def __index__(self):
return 100
Expand Down

0 comments on commit 63a2623

Please sign in to comment.