// read data from file /opt/example.parquet
String uri = "file:/opt/example.parquet";
ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
try (
BufferAllocator allocator = new RootAllocator();
DatasetFactory datasetFactory = new FileSystemDatasetFactory(
allocator, NativeMemoryPool.getDefault(),
FileFormat.PARQUET, uri);
Dataset dataset = datasetFactory.finish();
Scanner scanner = dataset.newScan(options);
ArrowReader reader = scanner.scanBatches()
List<ArrowRecordBatch> batches = new ArrayList<>();
while (reader.loadNextBatch()) {
try (VectorSchemaRoot root = reader.getVectorSchemaRoot()) {
final VectorUnloader unloader = new VectorUnloader(root);
batches.add(unloader.getRecordBatch());
// do something with read record batches, for example:
analyzeArrowData(batches);
// finished the analysis of the data, close all resources:
AutoCloseables.close(batches);
} catch (Exception e) {
e.printStackTrace();
ArrowRecordBatch
is a low-level composite Arrow data exchange format
that doesn’t provide API to read typed data from it directly.
It’s recommended to use utilities VectorLoader
to load it into a schema
aware container VectorSchemaRoot
by which user could be able to access
decoded data conveniently in Java.
The ScanOptions batchSize
argument takes effect only if it is set to a value
smaller than the number of rows in the recordbatch.
See also
Load record batches with VectorSchemaRoot.
Schema of the data to be queried can be inspected via method
DatasetFactory#inspect()
before actually reading it. For example:
// read data from local file /opt/example.parquet
String uri = "file:/opt/example.parquet";
BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
DatasetFactory factory = new FileSystemDatasetFactory(allocator,
NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri);
// inspect schema
Schema schema = factory.inspect();
For some of the data format that is compatible with a user-defined schema, user
can use method DatasetFactory#inspect(Schema schema)
to create the dataset:
Schema schema = createUserSchema()
Dataset dataset = factory.finish(schema);
Otherwise when the non-parameter method DatasetFactory#inspect()
is called,
schema will be inferred automatically from data source. The same as the result
of DatasetFactory#inspect()
.
Also, if projector is specified during scanning (see next section
Projection (Subset of Columns)), the actual schema of output data can be got
within method Scanner::schema()
:
Scanner scanner = dataset.newScan(
new ScanOptions(32768, Optional.of(new String[] {"id", "name"})));
Schema projectedSchema = scanner.schema();
User can specify projections in ScanOptions. For example:
String[] projection = new String[] {"id", "name"};
ScanOptions options = new ScanOptions(32768, Optional.of(projection));
If no projection is needed, leave the optional projection argument absent in
ScanOptions:
ScanOptions options = new ScanOptions(32768, Optional.empty());
Or use shortcut construtor:
ScanOptions options = new ScanOptions(32768);
Then all columns will be emitted during scanning.
User can specify projections (new columns) or filters in ScanOptions using Substrait. For example:
ByteBuffer substraitExpressionFilter = getSubstraitExpressionFilter();
ByteBuffer substraitExpressionProject = getSubstraitExpressionProjection();
// Use Substrait APIs to create an Expression and serialize to a ByteBuffer
ScanOptions options = new ScanOptions.Builder(/*batchSize*/ 32768)
.columns(Optional.empty())
.substraitExpressionFilter(substraitExpressionFilter)
.substraitExpressionProjection(getSubstraitExpressionProjection())
.build();
See also
Executing Projections and Filters Using Extended ExpressionsProjections and Filters using Substrait.
FileSystemDataset
supports reading data from non-local file systems. HDFS
support is included in the official Apache Arrow Java package releases and
can be used directly without re-building the source code.
To access HDFS data using Dataset API, pass a general HDFS URI to
FilesSystemDatasetFactory
:
String uri = "hdfs://{hdfs_host}:{port}/data/example.parquet";
BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
DatasetFactory factory = new FileSystemDatasetFactory(allocator,
NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri);
To gain better performance and reduce code complexity, Java
FileSystemDataset
internally relys on C++
arrow::dataset::FileSystemDataset
via JNI.
As a result, all Arrow data read from FileSystemDataset
is supposed to be
allocated off the JVM heap. To manage this part of memory, an utility class
NativeMemoryPool
is provided to users.
As a basic example, by using a listenable NativeMemoryPool
, user can pass
a listener hooking on C++ buffer allocation/deallocation:
AtomicLong reserved = new AtomicLong(0L);
ReservationListener listener = new ReservationListener() {
@Override
public void reserve(long size) {
reserved.getAndAdd(size);
@Override
public void unreserve(long size) {
reserved.getAndAdd(-size);
NativeMemoryPool pool = NativeMemoryPool.createListenable(listener);
FileSystemDatasetFactory factory = new FileSystemDatasetFactory(allocator,
pool, FileFormat.PARQUET, uri);
Also, it’s a very common case to reserve the same amount of JVM direct memory
for the data read from datasets. For this use a built-in utility
class DirectReservationListener
is provided:
NativeMemoryPool pool = NativeMemoryPool.createListenable(
DirectReservationListener.instance());
This way, once the allocated byte count of Arrow buffers reaches the limit of
JVM direct memory, OutOfMemoryError: Direct buffer memory
will
be thrown during scanning.
The default instance NativeMemoryPool.getDefaultMemoryPool()
does
nothing on buffer allocation/deallocation. It’s OK to use it in
the case of POC or testing, but for production use in complex environment,
it’s recommended to manage memory by using a listenable memory pool.
The BufferAllocator
instance passed to FileSystemDatasetFactory
’s
constructor is also aware of the overall memory usage of the produced
dataset instances. Once the Java buffers are created the passed allocator
will become their parent allocator.
As another result of relying on JNI, all components related to
FileSystemDataset
should be closed manually or use try-with-resources to
release the corresponding native objects after using. For example:
String uri = "file:/opt/example.parquet";
ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
try (
BufferAllocator allocator = new RootAllocator();
DatasetFactory factory = new FileSystemDatasetFactory(
allocator, NativeMemoryPool.getDefault(),
FileFormat.PARQUET, uri);
Dataset dataset = factory.finish();
Scanner scanner = dataset.newScan(options)
// do something
} catch (Exception e) {
e.printStackTrace();
If user forgets to close them then native object leakage might be caused.
The batchSize
argument of ScanOptions
is a limit on the size of an individual batch.
For example, let’s try to read a Parquet file with gzip compression and 3 row groups:
# Let configure ScanOptions as:
ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
$ parquet-tools meta data4_3rg_gzip.parquet
file schema: schema
age: OPTIONAL INT64 R:0 D:1
name: OPTIONAL BINARY L:STRING R:0 D:1
row group 1: RC:4 TS:182 OFFSET:4
row group 2: RC:4 TS:190 OFFSET:420
row group 3: RC:3 TS:179 OFFSET:838
Here, we set the batchSize in ScanOptions to 32768. Because that’s greater
than the number of rows in the next batch, which is 4 rows because the first
row group has only 4 rows, then the program gets only 4 rows. The scanner
will not combine smaller batches to reach the limit, but it will split
large batches to stay under the limit. So in the case the row group had more
than 32768 rows, it would get split into blocks of 32768 rows or less.
Projection (Subset of Columns)
Projection (Produce New Columns) and Filters
Read Data from HDFS
Native Memory Management
Usage Notes