From 11e770dad34a60f9e36e7b626538eaa1167f6a2e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Rou=C3=A9l?= Date: Mon, 6 Apr 2026 21:01:57 +0200 Subject: [PATCH 1/2] GH-3466 Improve `RunLengthBitPackingHybridDecoder.readNext` to avoid per-call buffer allocation and `DataInputStream` wrapping --- .../rle/RunLengthBitPackingHybridDecoder.java | 32 +++++++++++++++---- 1 file changed, 26 insertions(+), 6 deletions(-) diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java index e55b276b29..59eaa2b0e0 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java @@ -18,7 +18,7 @@ */ package org.apache.parquet.column.values.rle; -import java.io.DataInputStream; +import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import org.apache.parquet.Preconditions; @@ -48,6 +48,8 @@ private static enum MODE { private int currentCount; private int currentValue; private int[] currentBuffer; + private int currentBufferLength; + private byte[] packedBytes; public RunLengthBitPackingHybridDecoder(int bitWidth, InputStream in) { LOG.debug("decoding bitWidth {}", bitWidth); @@ -69,7 +71,7 @@ public int readInt() throws IOException { result = currentValue; break; case PACKED: - result = currentBuffer[currentBuffer.length - 1 - currentCount]; + result = currentBuffer[currentBufferLength - 1 - currentCount]; break; default: throw new ParquetDecodingException("not a valid mode " + mode); @@ -90,21 +92,39 @@ private void readNext() throws IOException { case PACKED: int numGroups = header >>> 1; currentCount = numGroups * 8; + currentBufferLength = currentCount; LOG.debug("reading {} values BIT PACKED", currentCount); - currentBuffer = new int[currentCount]; // TODO: reuse a buffer - byte[] bytes = new byte[numGroups * bitWidth]; + if (currentBuffer == null || currentBuffer.length < currentCount) { + currentBuffer = new int[currentCount]; + } + int bytesNeeded = numGroups * bitWidth; + if (packedBytes == null || packedBytes.length < bytesNeeded) { + packedBytes = new byte[bytesNeeded]; + } // At the end of the file RLE data though, there might not be that many bytes left. int bytesToRead = (int) Math.ceil(currentCount * bitWidth / 8.0); bytesToRead = Math.min(bytesToRead, in.available()); - new DataInputStream(in).readFully(bytes, 0, bytesToRead); + readFully(in, packedBytes, bytesToRead); for (int valueIndex = 0, byteIndex = 0; valueIndex < currentCount; valueIndex += 8, byteIndex += bitWidth) { - packer.unpack8Values(bytes, byteIndex, currentBuffer, valueIndex); + packer.unpack8Values(packedBytes, byteIndex, currentBuffer, valueIndex); } break; default: throw new ParquetDecodingException("not a valid mode " + mode); } } + + private static void readFully(InputStream in, byte[] buf, int len) throws IOException { + int offset = 0; + while (offset < len) { + int read = in.read(buf, offset, len - offset); + if (read < 0) { + throw new EOFException( + "Unexpected end of stream: still needed " + (len - offset) + " bytes"); + } + offset += read; + } + } } From 75979e3d900076af2d3d1ec341d74e68d841c140 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Rou=C3=A9l?= Date: Mon, 6 Apr 2026 21:55:09 +0200 Subject: [PATCH 2/2] GH-3466 Fix stale packed-byte tail corruption in RLE/bit-pack decoder --- .../rle/RunLengthBitPackingHybridDecoder.java | 22 +++------ .../TestRunLengthBitPackingHybridEncoder.java | 45 +++++++++++++++++++ 2 files changed, 50 insertions(+), 17 deletions(-) diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java index 59eaa2b0e0..b715d00af1 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java @@ -18,9 +18,9 @@ */ package org.apache.parquet.column.values.rle; -import java.io.EOFException; import java.io.IOException; import java.io.InputStream; +import java.util.Arrays; import org.apache.parquet.Preconditions; import org.apache.parquet.bytes.BytesUtils; import org.apache.parquet.column.values.bitpacking.BytePacker; @@ -101,10 +101,10 @@ private void readNext() throws IOException { if (packedBytes == null || packedBytes.length < bytesNeeded) { packedBytes = new byte[bytesNeeded]; } - // At the end of the file RLE data though, there might not be that many bytes left. - int bytesToRead = (int) Math.ceil(currentCount * bitWidth / 8.0); - bytesToRead = Math.min(bytesToRead, in.available()); - readFully(in, packedBytes, bytesToRead); + int bytesRead = in.readNBytes(packedBytes, 0, bytesNeeded); + if (bytesRead < bytesNeeded) { + Arrays.fill(packedBytes, bytesRead, bytesNeeded, (byte) 0); + } for (int valueIndex = 0, byteIndex = 0; valueIndex < currentCount; valueIndex += 8, byteIndex += bitWidth) { @@ -115,16 +115,4 @@ private void readNext() throws IOException { throw new ParquetDecodingException("not a valid mode " + mode); } } - - private static void readFully(InputStream in, byte[] buf, int len) throws IOException { - int offset = 0; - while (offset < len) { - int read = in.read(buf, offset, len - offset); - if (read < 0) { - throw new EOFException( - "Unexpected end of stream: still needed " + (len - offset) + " bytes"); - } - offset += read; - } - } } diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java b/parquet-column/src/test/java/org/apache/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java index 93a6c8deb4..04dbeed23e 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java @@ -19,9 +19,11 @@ package org.apache.parquet.column.values.rle; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertArrayEquals; import java.io.ByteArrayInputStream; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import org.apache.parquet.bytes.BytesUtils; import org.apache.parquet.bytes.DirectByteBufferAllocator; @@ -298,6 +300,49 @@ public void testGroupBoundary() throws Exception { assertEquals(stream.available(), 0); } + @Test + public void testTruncatedPackedRunAfterFullPackedRunDoesNotReuseStaleBytes() throws Exception { + int bitWidth = 3; + BytePacker packer = Packer.LITTLE_ENDIAN.newBytePacker(bitWidth); + + int[] firstRunValues = new int[8]; + Arrays.fill(firstRunValues, 7); + byte[] firstRunPacked = new byte[bitWidth]; + packer.pack8Values(firstRunValues, 0, firstRunPacked, 0); + + int[] secondRunValues = {1, 2, 3, 4, 5, 6, 7, 0}; + byte[] secondRunPacked = new byte[bitWidth]; + packer.pack8Values(secondRunValues, 0, secondRunPacked, 0); + + byte[] encoded = { + (byte) ((1 << 1) | 1), + firstRunPacked[0], + firstRunPacked[1], + firstRunPacked[2], + (byte) ((1 << 1) | 1), + secondRunPacked[0] + }; + + RunLengthBitPackingHybridDecoder decoder = + new RunLengthBitPackingHybridDecoder(bitWidth, new ByteArrayInputStream(encoded)); + + for (int ignored = 0; ignored < 8; ignored++) { + assertEquals(7, decoder.readInt()); + } + + int[] actualSecondRun = new int[8]; + for (int i = 0; i < 8; i++) { + actualSecondRun[i] = decoder.readInt(); + } + + byte[] expectedSecondPacked = new byte[bitWidth]; + expectedSecondPacked[0] = secondRunPacked[0]; + int[] expectedSecondRun = new int[8]; + packer.unpack8Values(expectedSecondPacked, 0, expectedSecondRun, 0); + + assertArrayEquals(expectedSecondRun, actualSecondRun); + } + private static List unpack(int bitWidth, int numValues, ByteArrayInputStream is) throws Exception { BytePacker packer = Packer.LITTLE_ENDIAN.newBytePacker(bitWidth);