List:
hadoop-commits
Subject:
[2/2] hadoop git commit: HADOOP-13578. Add Codec for ZStandard Compression. Contributed by churro mo
From:
jlowe () apache ! org
Date:
2017-01-04 16:20:37
Message-ID:
abdc5385a1264731ba9dc4df4bfae0c9 () git ! apache ! org
[Download RAW
message
or
body
]
HADOOP-13578. Add Codec for ZStandard Compression. Contributed by churro morales
Project:
http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit:
http://git-wip-us.apache.org/repos/asf/hadoop/commit/db947fb8
Tree:
http://git-wip-us.apache.org/repos/asf/hadoop/tree/db947fb8
Diff:
http://git-wip-us.apache.org/repos/asf/hadoop/diff/db947fb8
Branch: refs/heads/branch-2
Commit: db947fb8704c2098a7d70216ba6d34e65b070379
Parents: 3207762
Author: Jason Lowe <
[email protected]>
Authored: Wed Jan 4 14:46:44 2017 +0000
Committer: Jason Lowe <
[email protected]>
Committed: Wed Jan 4 14:46:44 2017 +0000
----------------------------------------------------------------------
BUILDING.txt | 24 +
hadoop-common-project/hadoop-common/pom.xml | 21 +
.../hadoop-common/src/CMakeLists.txt | 27 ++
.../hadoop-common/src/config.h.cmake | 1 +
.../hadoop/fs/CommonConfigurationKeys.java | 16 +
.../apache/hadoop/io/compress/Decompressor.java | 2 +-
.../hadoop/io/compress/ZStandardCodec.java | 242 +++++++++
.../io/compress/zstd/ZStandardCompressor.java | 305 ++++++++++++
.../io/compress/zstd/ZStandardDecompressor.java | 323 ++++++++++++
.../hadoop/io/compress/zstd/package-info.java | 22 +
.../apache/hadoop/util/NativeCodeLoader.java | 5 +
.../hadoop/util/NativeLibraryChecker.java | 12 +-
.../io/compress/zstd/ZStandardCompressor.c | 259 ++++++++++
.../io/compress/zstd/ZStandardDecompressor.c | 218 +++++++++
.../zstd/org_apache_hadoop_io_compress_zstd.h | 34 ++
.../org/apache/hadoop/util/NativeCodeLoader.c | 10 +
...g.apache.hadoop.io.compress.CompressionCodec | 2 +-
.../src/site/markdown/NativeLibraries.md.vm | 1 +
.../apache/hadoop/io/compress/TestCodec.java | 15 +-
.../io/compress/TestCompressionStreamReuse.java | 8 +
.../TestZStandardCompressorDecompressor.java | 485 +++++++++++++++++++
.../src/test/resources/zstd/test_file.txt | 71 +++
.../src/test/resources/zstd/test_file.txt.zst | Bin 0 -> 3690 bytes
hadoop-project-dist/pom.xml | 6 +
hadoop-project/pom.xml | 2 +
25 files changed, 2107 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db947fb8/BUILDING.txt
----------------------------------------------------------------------
diff --git a/BUILDING.txt b/BUILDING.txt
index 2f1f9f8..e596504 100644
--- a/BUILDING.txt
+++ b/BUILDING.txt
@@ -77,6 +77,8 @@ Optional packages:
$ sudo apt-get install libjansson-dev
* Linux FUSE
$ sudo apt-get install fuse libfuse-dev
+* ZStandard compression
+ $ sudo apt-get install zstd
----------------------------------------------------------------------------------
Maven main modules:
@@ -148,6 +150,28 @@ Maven build goals:
and it ignores the -Dsnappy.prefix option. If -Dsnappy.lib isn't given, the
bundling and building will fail.
+ ZStandard build options:
+ ZStandard is a compression library that can be utilized by the native code.
+ It is currently an optional component, meaning that Hadoop can be built with
+ or without this dependency.
+ * Use -Drequire.zstd to fail the build if libzstd.so is not found.
+ If this option is not specified and the zstd library is missing.
+ * Use -Dzstd.prefix to specify a nonstandard location for the libzstd
+ header files and library files. You do not need this option if you have
+ installed zstandard using a package manager.
+ * Use -Dzstd.lib to specify a nonstandard location for the libzstd library
+ files. Similarly to zstd.prefix, you do not need this option if you have
+ installed using a package manager.
+ * Use -Dbundle.zstd to copy the contents of the zstd.lib directory into
+ the final tar file. This option requires that -Dzstd.lib is also given,
+ and it ignores the -Dzstd.prefix option. If -Dzstd.lib isn't given, the
+ bundling and building will fail.
OpenSSL build options:
OpenSSL includes a crypto library that can be utilized by the native code.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db947fb8/hadoop-common-project/hadoop-common/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/pom.xml \
b/hadoop-common-project/hadoop-common/pom.xml index ddc85b8..ee7acb3 100644
--- a/hadoop-common-project/hadoop-common/pom.xml
+++ b/hadoop-common-project/hadoop-common/pom.xml
@@ -514,6 +514,10 @@
<snappy.lib></snappy.lib>
<snappy.include></snappy.include>
<require.snappy>false</require.snappy>
+ <zstd.prefix></zstd.prefix>
+ <zstd.lib></zstd.lib>
+ <zstd.include></zstd.include>
+ <require.zstd>false</require.zstd>
<openssl.prefix></openssl.prefix>
<openssl.lib></openssl.lib>
<openssl.include></openssl.include>
@@ -568,6 +572,8 @@
<javahClassName>org.apache.hadoop.security.JniBasedUnixGroupsNetgroupMapping</javahClassName>
<javahClassName>org.apache.hadoop.io.compress.snappy.SnappyCompressor</javahClassName>
<javahClassName>org.apache.hadoop.io.compress.snappy.SnappyDecompressor</javahClassName>
+ \
<javahClassName>org.apache.hadoop.io.compress.zstd.ZStandardCompressor</javahClassName>
+ \
<javahClassName>org.apache.hadoop.io.compress.zstd.ZStandardDecompressor</javahClassName>
<javahClassName>org.apache.hadoop.io.compress.lz4.Lz4Compressor</javahClassName>
<javahClassName>org.apache.hadoop.io.compress.lz4.Lz4Decompressor</javahClassName>
<javahClassName>org.apache.hadoop.crypto.OpensslCipher</javahClassName> @@ -599,6 \
+605,10 @@
<CUSTOM_SNAPPY_PREFIX>${snappy.prefix}</CUSTOM_SNAPPY_PREFIX>
<CUSTOM_SNAPPY_LIB>${snappy.lib} </CUSTOM_SNAPPY_LIB>
<CUSTOM_SNAPPY_INCLUDE>${snappy.include} \
</CUSTOM_SNAPPY_INCLUDE> + \
<REQUIRE_ZSTD>${require.zstd}</REQUIRE_ZSTD> + \
<CUSTOM_ZSTD_PREFIX>${zstd.prefix}</CUSTOM_ZSTD_PREFIX> + \
<CUSTOM_ZSTD_LIB>${zstd.lib}</CUSTOM_ZSTD_LIB> + \
<CUSTOM_ZSTD_INCLUDE>${zstd.include}</CUSTOM_ZSTD_INCLUDE>
<REQUIRE_OPENSSL>${require.openssl} </REQUIRE_OPENSSL>
<CUSTOM_OPENSSL_PREFIX>${openssl.prefix} \
</CUSTOM_OPENSSL_PREFIX>
<CUSTOM_OPENSSL_LIB>${openssl.lib} </CUSTOM_OPENSSL_LIB>
@@ -636,6 +646,11 @@
<snappy.include></snappy.include>
<require.snappy>false</require.snappy>
<bundle.snappy.in.bin>true</bundle.snappy.in.bin>
+ <zstd.prefix></zstd.prefix>
+ <zstd.lib></zstd.lib>
+ <zstd.include></zstd.include>
+ <require.ztsd>false</require.ztsd>
+ <bundle.zstd.in.bin>true</bundle.zstd.in.bin>
<openssl.prefix></openssl.prefix>
<openssl.lib></openssl.lib>
<openssl.include></openssl.include>
@@ -685,6 +700,8 @@
<javahClassName>org.apache.hadoop.security.JniBasedUnixGroupsNetgroupMapping</javahClassName>
<javahClassName>org.apache.hadoop.io.compress.snappy.SnappyCompressor</javahClassName>
<javahClassName>org.apache.hadoop.io.compress.snappy.SnappyDecompressor</javahClassName>
+ \
<javahClassName>org.apache.hadoop.io.compress.zstd.ZStandardCompressor</javahClassName>
+ \
<javahClassName>org.apache.hadoop.io.compress.zstd.ZStandardDecompressor</javahClassName>
<javahClassName>org.apache.hadoop.io.compress.lz4.Lz4Compressor</javahClassName>
<javahClassName>org.apache.hadoop.io.compress.lz4.Lz4Decompressor</javahClassName>
<javahClassName>org.apache.hadoop.crypto.OpensslCipher</javahClassName> @@ -736,6 \
+753,10 @@ <argument>/p:CustomSnappyLib=${snappy.lib}</argument>
<argument>/p:CustomSnappyInclude=${snappy.include}</argument>
<argument>/p:RequireSnappy=${require.snappy}</argument>
+ <argument>/p:CustomZstdPrefix=${zstd.prefix}</argument>
+ <argument>/p:CustomZstdLib=${zstd.lib}</argument>
+ <argument>/p:CustomZstdInclude=${zstd.include}</argument>
+ <argument>/p:RequireZstd=${require.ztsd}</argument>
<argument>/p:CustomOpensslPrefix=${openssl.prefix}</argument>
<argument>/p:CustomOpensslLib=${openssl.lib}</argument>
<argument>/p:CustomOpensslInclude=${openssl.include}</argument>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db947fb8/hadoop-common-project/hadoop-common/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/CMakeLists.txt \
b/hadoop-common-project/hadoop-common/src/CMakeLists.txt index c93bfe7..3e52643 \
100644
--- a/hadoop-common-project/hadoop-common/src/CMakeLists.txt
+++ b/hadoop-common-project/hadoop-common/src/CMakeLists.txt
@@ -94,6 +94,31 @@ else()
endif()
endif()
+# Require Zstandard
+SET(STORED_CMAKE_FIND_LIBRARY_SUFFIXES ${CMAKE_FIND_LIBRARY_SUFFIXES})
+hadoop_set_find_shared_library_version("1")
+find_library(ZSTD_LIBRARY
+ NAMES zstd
+ PATHS ${CUSTOM_ZSTD_PREFIX} ${CUSTOM_ZSTD_PREFIX}/lib
+ ${CUSTOM_ZSTD_PREFIX}/lib64 ${CUSTOM_ZSTD_LIB})
+SET(CMAKE_FIND_LIBRARY_SUFFIXES ${STORED_CMAKE_FIND_LIBRARY_SUFFIXES})
+find_path(ZSTD_INCLUDE_DIR
+ NAMES zstd.h
+ PATHS ${CUSTOM_ZSTD_PREFIX} ${CUSTOM_ZSTD_PREFIX}/include
+ ${CUSTOM_ZSTD_INCLUDE})
+if (ZSTD_LIBRARY AND ZSTD_INCLUDE_DIR)
+ GET_FILENAME_COMPONENT(HADOOP_ZSTD_LIBRARY ${ZSTD_LIBRARY} NAME)
+ set(ZSTD_SOURCE_FILES
+ "${SRC}/io/compress/zstd/ZStandardCompressor.c"
+ "${SRC}/io/compress/zstd/ZStandardDecompressor.c")
+else (ZSTD_LIBRARY AND ZSTD_INCLUDE_DIR)
+ set(ZSTD_INCLUDE_DIR "")
+ set(ZSTD_SOURCE_FILES "")
+ IF(REQUIRE_ZSTD)
+ MESSAGE(FATAL_ERROR "Required zstandard library could not be found. \
ZSTD_LIBRARY=${ZSTD_LIBRARY}, ZSTD_INCLUDE_DIR=${ZSTD_INCLUDE_DIR}, \
CUSTOM_ZSTD_INCLUDE_DIR=${CUSTOM_ZSTD_INCLUDE_DIR}, \
CUSTOM_ZSTD_PREFIX=${CUSTOM_ZSTD_PREFIX}, \
CUSTOM_ZSTD_INCLUDE=${CUSTOM_ZSTD_INCLUDE}") + ENDIF(REQUIRE_ZSTD)
+endif (ZSTD_LIBRARY AND ZSTD_INCLUDE_DIR)
# Build hardware CRC32 acceleration, if supported on the platform.
if(CMAKE_SYSTEM_PROCESSOR MATCHES "^i.86$" OR CMAKE_SYSTEM_PROCESSOR STREQUAL \
"x86_64" OR CMAKE_SYSTEM_PROCESSOR STREQUAL "amd64") set(BULK_CRC_ARCH_SOURCE_FIlE \
"${SRC}/util/bulk_crc32_x86.c") @@ -169,6 +194,7 @@ include_directories(
${ZLIB_INCLUDE_DIRS}
${BZIP2_INCLUDE_DIR}
${SNAPPY_INCLUDE_DIR}
+ ${ZSTD_INCLUDE_DIR}
${OPENSSL_INCLUDE_DIR}
${SRC}/util
@@ -182,6 +208,7 @@ hadoop_add_dual_library(hadoop
${SRC}/io/compress/lz4/lz4.c
${SRC}/io/compress/lz4/lz4hc.c
${SNAPPY_SOURCE_FILES}
+ ${ZSTD_SOURCE_FILES}
${OPENSSL_SOURCE_FILES}
${SRC}/io/compress/zlib/ZlibCompressor.c
${SRC}/io/compress/zlib/ZlibDecompressor.c
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db947fb8/hadoop-common-project/hadoop-common/src/config.h.cmake
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/config.h.cmake \
b/hadoop-common-project/hadoop-common/src/config.h.cmake index d71271d..f2e1586 \
100644
--- a/hadoop-common-project/hadoop-common/src/config.h.cmake
+++ b/hadoop-common-project/hadoop-common/src/config.h.cmake
@@ -21,6 +21,7 @@
#cmakedefine HADOOP_ZLIB_LIBRARY "@HADOOP_ZLIB_LIBRARY@"
#cmakedefine HADOOP_BZIP2_LIBRARY "@HADOOP_BZIP2_LIBRARY@"
#cmakedefine HADOOP_SNAPPY_LIBRARY "@HADOOP_SNAPPY_LIBRARY@"
+#cmakedefine HADOOP_ZSTD_LIBRARY "@HADOOP_ZSTD_LIBRARY@"
#cmakedefine HADOOP_OPENSSL_LIBRARY "@HADOOP_OPENSSL_LIBRARY@"
#cmakedefine HAVE_SYNC_FILE_RANGE
#cmakedefine HAVE_POSIX_FADVISE
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db947fb8/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java \
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
index ac634ab..49062ea 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
@@ -141,6 +141,22 @@ public class CommonConfigurationKeys extends \
CommonConfigurationKeysPublic { public static final int \
IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT = 256 * 1024;
+ /** ZStandard compression level. */
+ public static final String IO_COMPRESSION_CODEC_ZSTD_LEVEL_KEY =
+ "io.compression.codec.zstd.level";
+ /** Default value for IO_COMPRESSION_CODEC_ZSTD_LEVEL_KEY. */
+ public static final int IO_COMPRESSION_CODEC_ZSTD_LEVEL_DEFAULT = 3;
+ /** ZStandard buffer size. */
+ public static final String IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_KEY =
+ "io.compression.codec.zstd.buffersize";
+ /** ZStandard buffer size a value of 0 means use the recommended zstd
+ * buffer size that the library recommends. */
+ public static final int
+ IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT = 0;
/** Internal buffer size for Lz4 compressor/decompressors */
public static final String IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY =
"io.compression.codec.lz4.buffersize";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db947fb8/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Decompressor.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Decompressor.java \
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Decompressor.java
index 8cb0b2a..3808003 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Decompressor.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Decompressor.java
@@ -95,7 +95,7 @@ public interface Decompressor {
* @param b Buffer for the compressed data
* @param off Start offset of the data
* @param len Size of the buffer
- * @return The actual number of bytes of compressed data.
+ * @return The actual number of bytes of uncompressed data.
* @throws IOException
public int decompress(byte[] b, int off, int len) throws IOException;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db947fb8/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/ZStandardCodec.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/ZStandardCodec.java \
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/ZStandardCodec.java
new file mode 100644
index 0000000..11e98a1
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/ZStandardCodec.java
@@ -0,0 +1,242 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+package org.apache.hadoop.io.compress;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.io.compress.zstd.ZStandardCompressor;
+import org.apache.hadoop.io.compress.zstd.ZStandardDecompressor;
+import org.apache.hadoop.util.NativeCodeLoader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_KEY;
+ * This class creates zstd compressors/decompressors.
+public class ZStandardCodec implements
+ Configurable, CompressionCodec, DirectDecompressionCodec {
+ private Configuration conf;
+ /**
+ * Set the configuration to be used by this object.
+ *
+ * @param conf the configuration object.
+ */
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ /**
+ * Return the configuration used by this object.
+ *
+ * @return the configuration object used by this object.
+ */
+ @Override
+ public Configuration getConf() {
+ return conf;
+ public static void checkNativeCodeLoaded() {
+ if (!NativeCodeLoader.isNativeCodeLoaded() ||
+ !NativeCodeLoader.buildSupportsZstd()) {
+ throw new RuntimeException("native zStandard library "
+ + "not available: this version of libhadoop was built "
+ + "without zstd support.");
+ }
+ if (!ZStandardCompressor.isNativeCodeLoaded()) {
+ throw new RuntimeException("native zStandard library not "
+ + "available: ZStandardCompressor has not been loaded.");
+ }
+ if (!ZStandardDecompressor.isNativeCodeLoaded()) {
+ throw new RuntimeException("native zStandard library not "
+ + "available: ZStandardDecompressor has not been loaded.");
+ }
+ public static boolean isNativeCodeLoaded() {
+ return ZStandardCompressor.isNativeCodeLoaded()
+ && ZStandardDecompressor.isNativeCodeLoaded();
+ public static String getLibraryName() {
+ return ZStandardCompressor.getLibraryName();
+ public static int getCompressionLevel(Configuration conf) {
+ return conf.getInt(
+ CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_LEVEL_KEY,
+ CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_LEVEL_DEFAULT);
+ public static int getCompressionBufferSize(Configuration conf) {
+ int bufferSize = getBufferSize(conf);
+ return bufferSize == 0 ?
+ ZStandardCompressor.getRecommendedBufferSize() :
+ bufferSize;
+ public static int getDecompressionBufferSize(Configuration conf) {
+ int bufferSize = getBufferSize(conf);
+ return bufferSize == 0 ?
+ ZStandardDecompressor.getRecommendedBufferSize() :
+ bufferSize;
+ private static int getBufferSize(Configuration conf) {
+ return conf.getInt(IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_KEY,
+ IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT);
+ /**
+ * Create a {@link CompressionOutputStream} that will write to the given
+ * {@link OutputStream}.
+ *
+ * @param out the location for the final output stream
+ * @return a stream the user can write uncompressed data to have compressed
+ * @throws IOException
+ */
+ @Override
+ public CompressionOutputStream createOutputStream(OutputStream out)
+ throws IOException {
+ return Util.
+ createOutputStreamWithCodecPool(this, conf, out);
+ /**
+ * Create a {@link CompressionOutputStream} that will write to the given
+ * {@link OutputStream} with the given {@link Compressor}.
+ *
+ * @param out the location for the final output stream
+ * @param compressor compressor to use
+ * @return a stream the user can write uncompressed data to have compressed
+ * @throws IOException
+ */
+ @Override
+ public CompressionOutputStream createOutputStream(OutputStream out,
+ Compressor compressor)
+ throws IOException {
+ checkNativeCodeLoaded();
+ return new CompressorStream(out, compressor,
+ getCompressionBufferSize(conf));
+ /**
+ * Get the type of {@link Compressor} needed by this {@link CompressionCodec}.
+ *
+ * @return the type of compressor needed by this codec.
+ */
+ @Override
+ public Class<? extends Compressor> getCompressorType() {
+ checkNativeCodeLoaded();
+ return ZStandardCompressor.class;
+ /**
+ * Create a new {@link Compressor} for use by this {@link CompressionCodec}.
+ *
+ * @return a new compressor for use by this codec
+ */
+ @Override
+ public Compressor createCompressor() {
+ checkNativeCodeLoaded();
+ return new ZStandardCompressor(
+ getCompressionLevel(conf), getCompressionBufferSize(conf));
+ /**
+ * Create a {@link CompressionInputStream} that will read from the given
+ * input stream.
+ *
+ * @param in the stream to read compressed bytes from
+ * @return a stream to read uncompressed bytes from
+ * @throws IOException
+ */
+ @Override
+ public CompressionInputStream createInputStream(InputStream in)
+ throws IOException {
+ return Util.
+ createInputStreamWithCodecPool(this, conf, in);
+ /**
+ * Create a {@link CompressionInputStream} that will read from the given
+ * {@link InputStream} with the given {@link Decompressor}.
+ *
+ * @param in the stream to read compressed bytes from
+ * @param decompressor decompressor to use
+ * @return a stream to read uncompressed bytes from
+ * @throws IOException
+ */
+ @Override
+ public CompressionInputStream createInputStream(InputStream in,
+ Decompressor decompressor)
+ throws IOException {
+ checkNativeCodeLoaded();
+ return new DecompressorStream(in, decompressor,
+ getDecompressionBufferSize(conf));
+ /**
+ * Get the type of {@link Decompressor} needed by
+ * this {@link CompressionCodec}.
+ *
+ * @return the type of decompressor needed by this codec.
+ */
+ @Override
+ public Class<? extends Decompressor> getDecompressorType() {
+ checkNativeCodeLoaded();
+ return ZStandardDecompressor.class;
+ /**
+ * Create a new {@link Decompressor} for use by this {@link CompressionCodec}.
+ *
+ * @return a new decompressor for use by this codec
+ */
+ @Override
+ public Decompressor createDecompressor() {
+ checkNativeCodeLoaded();
+ return new ZStandardDecompressor(getDecompressionBufferSize(conf));
+ /**
+ * Get the default filename extension for this kind of compression.
+ *
+ * @return <code>.zst</code>.
+ */
+ @Override
+ public String getDefaultExtension() {
+ return ".zst";
+ @Override
+ public DirectDecompressor createDirectDecompressor() {
+ return new ZStandardDecompressor.ZStandardDirectDecompressor(
+ getDecompressionBufferSize(conf)
+ );
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db947fb8/hadoop-common-project/hado
\
op-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.java \
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.java
new file mode 100644
index 0000000..eb2121a
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.java
@@ -0,0 +1,305 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+package org.apache.hadoop.io.compress.zstd;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.ZStandardCodec;
+import org.apache.hadoop.util.NativeCodeLoader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+ * A {@link Compressor} based on the zStandard compression algorithm.
+ *
https://github.com/facebook/zstd
+public class ZStandardCompressor implements Compressor {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ZStandardCompressor.class);
+ private long stream;
+ private int level;
+ private int directBufferSize;
+ private byte[] userBuf = null;
+ private int userBufOff = 0, userBufLen = 0;
+ private ByteBuffer uncompressedDirectBuf = null;
+ private int uncompressedDirectBufOff = 0, uncompressedDirectBufLen = 0;
+ private boolean keepUncompressedBuf = false;
+ private ByteBuffer compressedDirectBuf = null;
+ private boolean finish, finished;
+ private long bytesRead = 0;
+ private long bytesWritten = 0;
+ private static boolean nativeZStandardLoaded = false;
+ static {
+ if (NativeCodeLoader.isNativeCodeLoaded()) {
+ try {
+ // Initialize the native library
+ initIDs();
+ nativeZStandardLoaded = true;
+ } catch (Throwable t) {
+ LOG.warn("Error loading zstandard native libraries: " + t);
+ }
+ }
+ public static boolean isNativeCodeLoaded() {
+ return nativeZStandardLoaded;
+ public static int getRecommendedBufferSize() {
+ return getStreamSize();
+ @VisibleForTesting
+ ZStandardCompressor() {
+ this(CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_LEVEL_DEFAULT,
+ CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
+ /**
+ * Creates a new compressor with the default compression level.
+ * Compressed data will be generated in ZStandard format.
+ */
+ public ZStandardCompressor(int level, int bufferSize) {
+ this(level, bufferSize, bufferSize);
+ @VisibleForTesting
+ ZStandardCompressor(int level, int inputBufferSize, int outputBufferSize) {
+ this.level = level;
+ stream = create();
+ this.directBufferSize = outputBufferSize;
+ uncompressedDirectBuf = ByteBuffer.allocateDirect(inputBufferSize);
+ compressedDirectBuf = ByteBuffer.allocateDirect(outputBufferSize);
+ compressedDirectBuf.position(outputBufferSize);
+ reset();
+ /**
+ * Prepare the compressor to be used in a new stream with settings defined in
+ * the given Configuration. It will reset the compressor's compression level
+ * and compression strategy.
+ *
+ * @param conf Configuration storing new settings
+ */
+ @Override
+ public void reinit(Configuration conf) {
+ if (conf == null) {
+ return;
+ }
+ level = ZStandardCodec.getCompressionLevel(conf);
+ reset();
+ LOG.debug("Reinit compressor with new compression configuration");
+ @Override
+ public void setInput(byte[] b, int off, int len) {
+ if (b == null) {
+ throw new NullPointerException();
+ }
+ if (off < 0 || len < 0 || off > b.length - len) {
+ throw new ArrayIndexOutOfBoundsException();
+ }
+ this.userBuf = b;
+ this.userBufOff = off;
+ this.userBufLen = len;
+ uncompressedDirectBufOff = 0;
+ setInputFromSavedData();
+ compressedDirectBuf.limit(directBufferSize);
+ compressedDirectBuf.position(directBufferSize);
+ //copy enough data from userBuf to uncompressedDirectBuf
+ private void setInputFromSavedData() {
+ int len = Math.min(userBufLen, uncompressedDirectBuf.remaining());
+ uncompressedDirectBuf.put(userBuf, userBufOff, len);
+ userBufLen -= len;
+ userBufOff += len;
+ uncompressedDirectBufLen = uncompressedDirectBuf.position();
+ @Override
+ public void setDictionary(byte[] b, int off, int len) {
+ throw new UnsupportedOperationException(
+ "Dictionary support is not enabled");
+ @Override
+ public boolean needsInput() {
+ // Consume remaining compressed data?
+ if (compressedDirectBuf.remaining() > 0) {
+ return false;
+ }
+ // have we consumed all input
+ if (keepUncompressedBuf && uncompressedDirectBufLen > 0) {
+ return false;
+ }
+ if (uncompressedDirectBuf.remaining() > 0) {
+ // Check if we have consumed all user-input
+ if (userBufLen <= 0) {
+ return true;
+ } else {
+ // copy enough data from userBuf to uncompressedDirectBuf
+ setInputFromSavedData();
+ // uncompressedDirectBuf is not full
+ return uncompressedDirectBuf.remaining() > 0;
+ }
+ }
+ return false;
+ @Override
+ public void finish() {
+ finish = true;
+ @Override
+ public boolean finished() {
+ // Check if 'zstd' says its 'finished' and all compressed
+ // data has been consumed
+ return (finished && compressedDirectBuf.remaining() == 0);
+ @Override
+ public int compress(byte[] b, int off, int len) throws IOException {
+ checkStream();
+ if (b == null) {
+ throw new NullPointerException();
+ }
+ if (off < 0 || len < 0 || off > b.length - len) {
+ throw new ArrayIndexOutOfBoundsException();
+ }
+ // Check if there is compressed data
+ int n = compressedDirectBuf.remaining();
+ if (n > 0) {
+ n = Math.min(n, len);
+ compressedDirectBuf.get(b, off, n);
+ return n;
+ }
+ // Re-initialize the output direct buffer
+ compressedDirectBuf.rewind();
+ compressedDirectBuf.limit(directBufferSize);
+ // Compress data
+ n = deflateBytesDirect(
+ uncompressedDirectBuf,
+ uncompressedDirectBufOff,
+ uncompressedDirectBufLen,
+ compressedDirectBuf,
+ directBufferSize
+ );
+ compressedDirectBuf.limit(n);
+ // Check if we have consumed all input buffer
+ if (uncompressedDirectBufLen <= 0) {
+ // consumed all input buffer
+ keepUncompressedBuf = false;
+ uncompressedDirectBuf.clear();
+ uncompressedDirectBufOff = 0;
+ uncompressedDirectBufLen = 0;
+ } else {
+ // did not consume all input buffer
+ keepUncompressedBuf = true;
+ }
+ // Get at most 'len' bytes
+ n = Math.min(n, len);
+ compressedDirectBuf.get(b, off, n);
+ return n;
+ /**
+ * Returns the total number of compressed bytes output so far.
+ *
+ * @return the total (non-negative) number of compressed bytes output so far
+ */
+ @Override
+ public long getBytesWritten() {
+ checkStream();
+ return bytesWritten;
+ /**
+ * <p>Returns the total number of uncompressed bytes input so far.</p>
+ *
+ * @return the total (non-negative) number of uncompressed bytes input so far
+ */
+ @Override
+ public long getBytesRead() {
+ checkStream();
+ return bytesRead;
+ @Override
+ public void reset() {
+ checkStream();
+ init(level, stream);
+ finish = false;
+ finished = false;
+ bytesRead = 0;
+ bytesWritten = 0;
+ uncompressedDirectBuf.rewind();
+ uncompressedDirectBufOff = 0;
+ uncompressedDirectBufLen = 0;
+ keepUncompressedBuf = false;
+ compressedDirectBuf.limit(directBufferSize);
+ compressedDirectBuf.position(directBufferSize);
+ userBufOff = 0;
+ userBufLen = 0;
+ @Override
+ public void end() {
+ if (stream != 0) {
+ end(stream);
+ stream = 0;
+ }
+ private void checkStream() {
+ if (stream == 0) {
+ throw new NullPointerException();
+ }
+ private native static long create();
+ private native static void init(int level, long stream);
+ private native int deflateBytesDirect(ByteBuffer src, int srcOffset,
+ int srcLen, ByteBuffer dst, int dstLen);
+ private static native int getStreamSize();
+ private native static void end(long strm);
+ private native static void initIDs();
+ public native static String getLibraryName();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db947fb8/hadoop-common-project/hado
\
op-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.java \
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.java
new file mode 100644
index 0000000..73d73e1
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.java
@@ -0,0 +1,323 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+package org.apache.hadoop.io.compress.zstd;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.DirectDecompressor;
+import org.apache.hadoop.util.NativeCodeLoader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+ * A {@link Decompressor} based on the zStandard compression algorithm.
+ *
https://github.com/facebook/zstd
+public class ZStandardDecompressor implements Decompressor {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ZStandardDecompressor.class);
+ private long stream;
+ private int directBufferSize;
+ private ByteBuffer compressedDirectBuf = null;
+ private int compressedDirectBufOff, bytesInCompressedBuffer;
+ private ByteBuffer uncompressedDirectBuf = null;
+ private byte[] userBuf = null;
+ private int userBufOff = 0, userBufferBytesToConsume = 0;
+ private boolean finished;
+ private int remaining = 0;
+ private static boolean nativeZStandardLoaded = false;
+ static {
+ if (NativeCodeLoader.isNativeCodeLoaded()) {
+ try {
+ // Initialize the native library
+ initIDs();
+ nativeZStandardLoaded = true;
+ } catch (Throwable t) {
+ LOG.warn("Error loading zstandard native libraries: " + t);
+ }
+ }
+ public static boolean isNativeCodeLoaded() {
+ return nativeZStandardLoaded;
+ public static int getRecommendedBufferSize() {
+ return getStreamSize();
+ public ZStandardDecompressor() {
+ this(getStreamSize());
+ /**
+ * Creates a new decompressor.
+ */
+ public ZStandardDecompressor(int bufferSize) {
+ this.directBufferSize = bufferSize;
+ compressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
+ uncompressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
+ uncompressedDirectBuf.position(directBufferSize);
+ stream = create();
+ reset();
+ @Override
+ public void setInput(byte[] b, int off, int len) {
+ if (b == null) {
+ throw new NullPointerException();
+ }
+ if (off < 0 || len < 0 || off > b.length - len) {
+ throw new ArrayIndexOutOfBoundsException();
+ }
+ this.userBuf = b;
+ this.userBufOff = off;
+ this.userBufferBytesToConsume = len;
+ setInputFromSavedData();
+ uncompressedDirectBuf.limit(directBufferSize);
+ uncompressedDirectBuf.position(directBufferSize);
+ private void setInputFromSavedData() {
+ compressedDirectBufOff = 0;
+ bytesInCompressedBuffer = userBufferBytesToConsume;
+ if (bytesInCompressedBuffer > directBufferSize) {
+ bytesInCompressedBuffer = directBufferSize;
+ }
+ compressedDirectBuf.rewind();
+ compressedDirectBuf.put(
+ userBuf, userBufOff, bytesInCompressedBuffer);
+ userBufOff += bytesInCompressedBuffer;
+ userBufferBytesToConsume -= bytesInCompressedBuffer;
+ // dictionary is not supported
+ @Override
+ public void setDictionary(byte[] b, int off, int len) {
+ throw new UnsupportedOperationException(
+ "Dictionary support is not enabled");
+ @Override
+ public boolean needsInput() {
+ // Consume remaining compressed data?
+ if (uncompressedDirectBuf.remaining() > 0) {
+ return false;
+ }
+ // Check if we have consumed all input
+ if (bytesInCompressedBuffer - compressedDirectBufOff <= 0) {
+ // Check if we have consumed all user-input
+ if (userBufferBytesToConsume <= 0) {
+ return true;
+ } else {
+ setInputFromSavedData();
+ }
+ }
+ return false;
+ // dictionary is not supported.
+ @Override
+ public boolean needsDictionary() {
+ return false;
+ @Override
+ public boolean finished() {
+ // finished == true if ZSTD_decompressStream() returns 0
+ // also check we have nothing left in our buffer
+ return (finished && uncompressedDirectBuf.remaining() == 0);
+ @Override
+ public int decompress(byte[] b, int off, int len)
+ throws IOException {
+ checkStream();
+ if (b == null) {
+ throw new NullPointerException();
+ }
+ if (off < 0 || len < 0 || off > b.length - len) {
+ throw new ArrayIndexOutOfBoundsException();
+ }
+ // Check if there is uncompressed data
+ int n = uncompressedDirectBuf.remaining();
+ if (n > 0) {
+ return populateUncompressedBuffer(b, off, len, n);
+ }
+ // Re-initialize the output direct buffer
+ uncompressedDirectBuf.rewind();
+ uncompressedDirectBuf.limit(directBufferSize);
+ // Decompress data
+ n = inflateBytesDirect(
+ compressedDirectBuf,
+ compressedDirectBufOff,
+ bytesInCompressedBuffer,
+ uncompressedDirectBuf,
+ 0,
+ directBufferSize
+ );
+ uncompressedDirectBuf.limit(n);
+ // Get at most 'len' bytes
+ return populateUncompressedBuffer(b, off, len, n);
+ /**
+ * <p>Returns the number of bytes remaining in the input buffers;
+ * normally called when finished() is true to determine amount of post-stream
+ * data.</p>
+ *
+ * @return the total (non-negative) number of unprocessed bytes in input
+ */
+ @Override
+ public int getRemaining() {
+ checkStream();
+ // userBuf + compressedDirectBuf
+ return userBufferBytesToConsume + remaining;
+ /**
+ * Resets everything including the input buffers (user and direct).
+ */
+ @Override
+ public void reset() {
+ checkStream();
+ init(stream);
+ remaining = 0;
+ finished = false;
+ compressedDirectBufOff = 0;
+ bytesInCompressedBuffer = 0;
+ uncompressedDirectBuf.limit(directBufferSize);
+ uncompressedDirectBuf.position(directBufferSize);
+ userBufOff = 0;
+ userBufferBytesToConsume = 0;
+ @Override
+ public void end() {
+ if (stream != 0) {
+ free(stream);
+ stream = 0;
+ }
+ @Override
+ protected void finalize() {
+ reset();
+ private void checkStream() {
+ if (stream == 0) {
+ throw new NullPointerException("Stream not initialized");
+ }
+ private int populateUncompressedBuffer(byte[] b, int off, int len, int n) {
+ n = Math.min(n, len);
+ uncompressedDirectBuf.get(b, off, n);
+ return n;
+ private native static void initIDs();
+ private native static long create();
+ private native static void init(long stream);
+ private native int inflateBytesDirect(ByteBuffer src, int srcOffset,
+ int srcLen, ByteBuffer dst, int dstOffset, int dstLen);
+ private native static void free(long strm);
+ private native static int getStreamSize();
+ int inflateDirect(ByteBuffer src, ByteBuffer dst) throws IOException {
+ assert
+ (this instanceof ZStandardDecompressor.ZStandardDirectDecompressor);
+ int originalPosition = dst.position();
+ int n = inflateBytesDirect(
+ src, src.position(), src.remaining(), dst, dst.position(),
+ dst.remaining()
+ );
+ dst.position(originalPosition + n);
+ if (bytesInCompressedBuffer > 0) {
+ src.position(compressedDirectBufOff);
+ } else {
+ src.position(src.limit());
+ }
+ return n;
+ /**
+ * A {@link DirectDecompressor} for ZStandard
+ *
https://github.com/facebook/zstd
.
+ */
+ public static class ZStandardDirectDecompressor
+ extends ZStandardDecompressor implements DirectDecompressor {
+ public ZStandardDirectDecompressor(int directBufferSize) {
+ super(directBufferSize);
+ }
+ @Override
+ public boolean finished() {
+ return (endOfInput && super.finished());
+ }
+ @Override
+ public void reset() {
+ super.reset();
+ endOfInput = true;
+ }
+ private boolean endOfInput;
+ @Override
+ public void decompress(ByteBuffer src, ByteBuffer dst)
+ throws IOException {
+ assert dst.isDirect() : "dst.isDirect()";
+ assert src.isDirect() : "src.isDirect()";
+ assert dst.remaining() > 0 : "dst.remaining() > 0";
+ this.inflateDirect(src, dst);
+ endOfInput = !src.hasRemaining();
+ }
+ @Override
+ public void setDictionary(byte[] b, int off, int len) {
+ throw new UnsupportedOperationException(
+ "byte[] arrays are not supported for DirectDecompressor");
+ }
+ @Override
+ public int decompress(byte[] b, int off, int len) {
+ throw new UnsupportedOperationException(
+ "byte[] arrays are not supported for DirectDecompressor");
+ }
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db947fb8/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/package-info.java \
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/package-info.java
new file mode 100644
index 0000000..9069070
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/package-info.java
@@ -0,0 +1,22 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
[email protected]
[email protected]
+package org.apache.hadoop.io.compress.zstd;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db947fb8/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeCodeLoader.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeCodeLoader.java \
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeCodeLoader.java
index 533fc07..ff5803c 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeCodeLoader.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeCodeLoader.java
@@ -80,6 +80,11 @@ public class NativeCodeLoader {
public static native boolean buildSupportsSnappy();
+ * Returns true only if this build was compiled with support for ZStandard.
+ */
+ public static native boolean buildSupportsZstd();
+ /**
* Returns true only if this build was compiled with support for openssl.
public static native boolean buildSupportsOpenssl();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db947fb8/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeLibraryChecker.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeLibraryChecker.java \
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeLibraryChecker.java
index d8c6899..c3ffe58 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeLibraryChecker.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeLibraryChecker.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.OpensslCipher;
import org.apache.hadoop.io.compress.Lz4Codec;
import org.apache.hadoop.io.compress.SnappyCodec;
+import org.apache.hadoop.io.compress.ZStandardCodec;
import org.apache.hadoop.io.compress.bzip2.Bzip2Factory;
import org.apache.hadoop.io.compress.zlib.ZlibFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -65,6 +66,7 @@ public class NativeLibraryChecker {
boolean nativeHadoopLoaded = NativeCodeLoader.isNativeCodeLoaded();
boolean zlibLoaded = false;
boolean snappyLoaded = false;
+ boolean zStdLoaded = false;
// lz4 is linked within libhadoop
boolean lz4Loaded = nativeHadoopLoaded;
boolean bzip2Loaded = Bzip2Factory.isNativeBzip2Loaded(conf);
@@ -75,6 +77,7 @@ public class NativeLibraryChecker {
String hadoopLibraryName = "";
String zlibLibraryName = "";
String snappyLibraryName = "";
+ String zstdLibraryName = "";
String lz4LibraryName = "";
String bzip2LibraryName = "";
String winutilsPath = null;
@@ -90,6 +93,11 @@ public class NativeLibraryChecker {
if (snappyLoaded && NativeCodeLoader.buildSupportsSnappy()) {
snappyLibraryName = SnappyCodec.getLibraryName();
+ zStdLoaded = NativeCodeLoader.buildSupportsZstd() &&
+ ZStandardCodec.isNativeCodeLoaded();
+ if (zStdLoaded && NativeCodeLoader.buildSupportsZstd()) {
+ zstdLibraryName = ZStandardCodec.getLibraryName();
+ }
if (OpensslCipher.getLoadingFailureReason() != null) {
openSslDetail = OpensslCipher.getLoadingFailureReason();
openSslLoaded = false;
@@ -122,6 +130,7 @@ public class NativeLibraryChecker {
System.out.printf("hadoop: %b %s%n", nativeHadoopLoaded, hadoopLibraryName);
System.out.printf("zlib: %b %s%n", zlibLoaded, zlibLibraryName);
System.out.printf("snappy: %b %s%n", snappyLoaded, snappyLibraryName);
+ System.out.printf("zstd : %b %s%n", zStdLoaded, zstdLibraryName);
System.out.printf("lz4: %b %s%n", lz4Loaded, lz4LibraryName);
System.out.printf("bzip2: %b %s%n", bzip2Loaded, bzip2LibraryName);
System.out.printf("openssl: %b %s%n", openSslLoaded, openSslDetail);
@@ -130,7 +139,8 @@ public class NativeLibraryChecker {
if ((!nativeHadoopLoaded) || (Shell.WINDOWS && (!winutilsExists)) ||
- (checkAll && !(zlibLoaded && snappyLoaded && lz4Loaded && bzip2Loaded))) {
+ (checkAll && !(zlibLoaded && snappyLoaded && lz4Loaded && bzip2Loaded
+ && zStdLoaded))) {
// return 1 to indicated check failed
ExitUtil.terminate(1);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db947fb8/hadoop-common-project/hado
\
op-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.c
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.c \
b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.c
new file mode 100644
index 0000000..04f2a3e
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.c
@@ -0,0 +1,259 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+#include "org_apache_hadoop_io_compress_zstd.h"
+#if defined HADOOP_ZSTD_LIBRARY
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#ifdef UNIX
+#include <dlfcn.h>
+#include "config.h"
+#endif
+#include "org_apache_hadoop_io_compress_zstd_ZStandardCompressor.h"
+static jfieldID ZStandardCompressor_stream;
+static jfieldID ZStandardCompressor_uncompressedDirectBufOff;
+static jfieldID ZStandardCompressor_uncompressedDirectBufLen;
+static jfieldID ZStandardCompressor_directBufferSize;
+static jfieldID ZStandardCompressor_finish;
+static jfieldID ZStandardCompressor_finished;
+static jfieldID ZStandardCompressor_bytesWritten;
+static jfieldID ZStandardCompressor_bytesRead;
+#ifdef UNIX
+static size_t (*dlsym_ZSTD_CStreamInSize)(void);
+static size_t (*dlsym_ZSTD_CStreamOutSize)(void);
+static ZSTD_CStream* (*dlsym_ZSTD_createCStream)(void);
+static size_t (*dlsym_ZSTD_initCStream)(ZSTD_CStream*, int);
+static size_t (*dlsym_ZSTD_freeCStream)(ZSTD_CStream*);
+static size_t (*dlsym_ZSTD_compressStream)(ZSTD_CStream*, ZSTD_outBuffer*, \
ZSTD_inBuffer*); +static size_t (*dlsym_ZSTD_endStream)(ZSTD_CStream*, \
ZSTD_outBuffer*); +static size_t (*dlsym_ZSTD_flushStream)(ZSTD_CStream*, \
ZSTD_outBuffer*); +static unsigned (*dlsym_ZSTD_isError)(size_t);
+static const char * (*dlsym_ZSTD_getErrorName)(size_t);
+#endif
+#ifdef WINDOWS
+typedef size_t (__cdecl *__dlsym_ZSTD_CStreamInSize)(void);
+typedef size_t (__cdecl *__dlsym_ZSTD_CStreamOutSize)(void);
+typedef ZSTD_CStream* (__cdecl *__dlsym_ZSTD_createCStream)(void);
+typedef size_t (__cdecl *__dlsym_ZSTD_initCStream)(ZSTD_CStream*, int);
+typedef size_t (__cdecl *__dlsym_ZSTD_freeCStream)(ZSTD_CStream*);
+typedef size_t (__cdecl *__dlsym_ZSTD_compressStream)(ZSTD_CStream*, \
ZSTD_outBuffer*, ZSTD_inBuffer*); +typedef size_t (__cdecl \
*__dlsym_ZSTD_endStream)(ZSTD_CStream*, ZSTD_outBuffer*); +typedef size_t (__cdecl \
*__dlsym_ZSTD_flushStream)(ZSTD_CStream*, ZSTD_outBuffer*); +typedef unsigned \
(__cdecl *__dlsym_ZSTD_isError)(size_t); +typedef const char * (__cdecl \
*__dlsym_ZSTD_getErrorName)(size_t); +
+static __dlsym_ZSTD_CStreamInSize dlsym_ZSTD_CStreamInSize;
+static __dlsym_ZSTD_CStreamOutSize dlsym_ZSTD_CStreamOutSize;
+static __dlsym_ZSTD_createCStream dlsym_ZSTD_createCStream;
+static __dlsym_ZSTD_initCStream dlsym_ZSTD_initCStream;
+static __dlsym_ZSTD_freeCStream dlsym_ZSTD_freeCStream;
+static __dlsym_ZSTD_compressStream dlsym_ZSTD_compressStream;
+static __dlsym_ZSTD_endStream dlsym_ZSTD_endStream;
+static __dlsym_ZSTD_flushStream dlsym_ZSTD_flushStream;
+static __dlsym_ZSTD_isError dlsym_ZSTD_isError;
+static __dlsym_ZSTD_getErrorName dlsym_ZSTD_getErrorName;
+#endif
+// Load the libztsd.so from disk
+JNIEXPORT void JNICALL \
Java_org_apache_hadoop_io_compress_zstd_ZStandardCompressor_initIDs (JNIEnv *env, \
jclass clazz) { +#ifdef UNIX
+ // Load libzstd.so
+ void *libzstd = dlopen(HADOOP_ZSTD_LIBRARY, RTLD_LAZY | RTLD_GLOBAL);
+ if (!libzstd) {
+ char* msg = (char*)malloc(10000);
+ snprintf(msg, 10000, "%s (%s)!", "Cannot load " HADOOP_ZSTD_LIBRARY, \
dlerror()); + THROW(env, "java/lang/InternalError", msg);
+ return;
+ }
+#endif
+#ifdef WINDOWS
+ HMODULE libzstd = LoadLibrary(HADOOP_ZSTD_LIBRARY);
+ if (!libzstd) {
+ THROW(env, "java/lang/UnsatisfiedLinkError", "Cannot load zstd.dll");
+ return;
+ }
+#endif
+#ifdef UNIX
+ // load dynamic symbols
+ dlerror();
+ LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_CStreamInSize, env, libzstd, \
"ZSTD_CStreamInSize"); + LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_CStreamOutSize, env, \
libzstd, "ZSTD_CStreamOutSize"); + LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_createCStream, \
env, libzstd, "ZSTD_createCStream"); + LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_initCStream, \
env, libzstd, "ZSTD_initCStream"); + LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_freeCStream, \
env, libzstd, "ZSTD_freeCStream"); + \
LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_compressStream, env, libzstd, "ZSTD_compressStream"); \
+ LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_endStream, env, libzstd, "ZSTD_endStream"); + \
LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_flushStream, env, libzstd, "ZSTD_flushStream"); + \
LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_isError, env, libzstd, "ZSTD_isError"); + \
LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_getErrorName, env, libzstd, "ZSTD_getErrorName"); \
+#endif +
+#ifdef WINDOWS
+ LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_CStreamInSize, dlsym_ZSTD_CStreamInSize, env, \
libzstd, "ZSTD_CStreamInSize"); + LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_CStreamOutSize, \
dlsym_ZSTD_CStreamOutSize, env, libzstd, "ZSTD_CStreamOutSize"); + \
LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_createCStream, dlsym_ZSTD_createCStream, env, \
libzstd, "ZSTD_createCStream"); + LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_initCStream, \
dlsym_ZSTD_initCStream, env, libzstd, "ZSTD_initCStream"); + \
LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_freeCStream, dlsym_ZSTD_freeCStream, env, libzstd, \
"ZSTD_freeCStream"); + LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_compressStream, \
dlsym_ZSTD_compressStream, env, libzstd, "ZSTD_compressStream"); + \
LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_endStream, dlsym_ZSTD_endStream, env, libzstd, \
"ZSTD_endStream"); + LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_flushStream, \
dlsym_ZSTD_flushStream, env, libzstd, "ZSTD_flushStream"); + \
LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_isError, dlsym_ZSTD_isError, env, libzstd, \
"ZSTD_isError"); + LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_getErrorName, \
dlsym_ZSTD_getErrorName, env, libzstd, "ZSTD_getErrorName"); +#endif
+ // load fields
+ ZStandardCompressor_stream = (*env)->GetFieldID(env, clazz, "stream", "J");
+ ZStandardCompressor_finish = (*env)->GetFieldID(env, clazz, "finish", "Z");
+ ZStandardCompressor_finished = (*env)->GetFieldID(env, clazz, "finished", "Z");
+ ZStandardCompressor_uncompressedDirectBufOff = (*env)->GetFieldID(env, clazz, \
"uncompressedDirectBufOff", "I"); + ZStandardCompressor_uncompressedDirectBufLen = \
(*env)->GetFieldID(env, clazz, "uncompressedDirectBufLen", "I"); + \
ZStandardCompressor_directBufferSize = (*env)->GetFieldID(env, clazz, \
"directBufferSize", "I"); + ZStandardCompressor_bytesRead = \
(*env)->GetFieldID(env, clazz, "bytesRead", "J"); + \
ZStandardCompressor_bytesWritten = (*env)->GetFieldID(env, clazz, "bytesWritten", \
"J"); +}
+// Create the compression stream
+JNIEXPORT jlong JNICALL \
Java_org_apache_hadoop_io_compress_zstd_ZStandardCompressor_create (JNIEnv *env, \
jobject this) { + ZSTD_CStream* const stream = dlsym_ZSTD_createCStream();
+ if (stream == NULL) {
+ THROW(env, "java/lang/InternalError", "Error creating the stream");
+ return (jlong)0;
+ }
+ return (jlong) stream;
+// Initialize the compression stream
+JNIEXPORT void JNICALL \
Java_org_apache_hadoop_io_compress_zstd_ZStandardCompressor_init (JNIEnv *env, \
jobject this, jint level, jlong stream) { + size_t result = \
dlsym_ZSTD_initCStream((ZSTD_CStream *) stream, level); + if \
(dlsym_ZSTD_isError(result)) { + THROW(env, "java/lang/InternalError", \
dlsym_ZSTD_getErrorName(result)); + return;
+ }
+// free the compression stream
+JNIEXPORT void JNICALL \
Java_org_apache_hadoop_io_compress_zstd_ZStandardCompressor_end (JNIEnv *env, jobject \
this, jlong stream) { + size_t result = dlsym_ZSTD_freeCStream((ZSTD_CStream *) \
stream); + if (dlsym_ZSTD_isError(result)) {
+ THROW(env, "java/lang/InternalError", dlsym_ZSTD_getErrorName(result));
+ return;
+ }
+JNIEXPORT jint Java_org_apache_hadoop_io_compress_zstd_ZStandardCompressor_deflateBytesDirect
+(JNIEnv *env, jobject this, jobject uncompressed_direct_buf, jint \
uncompressed_direct_buf_off, jint uncompressed_direct_buf_len, jobject \
compressed_direct_buf, jint compressed_direct_buf_len ) { + ZSTD_CStream* const \
stream = (ZSTD_CStream*) (*env)->GetLongField(env, this, ZStandardCompressor_stream); \
+ if (!stream) { + THROW(env, "java/lang/NullPointerException", NULL);
+ return (jint)0;
+ }
+ jlong bytes_read = (*env)->GetLongField(env, this, \
ZStandardCompressor_bytesRead); + jlong bytes_written = (*env)->GetLongField(env, \
this, ZStandardCompressor_bytesWritten); + jboolean finish = \
(*env)->GetBooleanField(env, this, ZStandardCompressor_finish); +
+ // Get the input direct buffer
+ void * uncompressed_bytes = (*env)->GetDirectBufferAddress(env, \
uncompressed_direct_buf); + if (!uncompressed_bytes) {
+ THROW(env, "java/lang/InternalError", "Undefined memory address for \
uncompressedDirectBuf"); + return (jint) 0;
+ }
+ // Get the output direct buffer
+ void * compressed_bytes = (*env)->GetDirectBufferAddress(env, \
compressed_direct_buf); + if (!compressed_bytes) {
+ THROW(env, "java/lang/InternalError", "Undefined memory address for \
compressedDirectBuf"); + return (jint) 0;
+ }
+ ZSTD_inBuffer input = { uncompressed_bytes, uncompressed_direct_buf_len, \
uncompressed_direct_buf_off }; + ZSTD_outBuffer output = { compressed_bytes, \
compressed_direct_buf_len, 0 }; +
+ size_t size = dlsym_ZSTD_compressStream(stream, &output, &input);
+ if (dlsym_ZSTD_isError(size)) {
+ THROW(env, "java/lang/InternalError", dlsym_ZSTD_getErrorName(size));
+ return (jint) 0;
+ }
+ if (finish && input.pos == input.size) {
+ // end the stream, flush and write the frame epilogue
+ size = dlsym_ZSTD_endStream(stream, &output);
+ if (!size) {
+ (*env)->SetBooleanField(env, this, ZStandardCompressor_finished, \
JNI_TRUE); + }
+ } else {
+ // need to flush the output buffer
+ // this also updates the output buffer position.
+ size = dlsym_ZSTD_flushStream(stream, &output);
+ }
+ if (dlsym_ZSTD_isError(size)) {
+ THROW(env, "java/lang/InternalError", dlsym_ZSTD_getErrorName(size));
+ return (jint) 0;
+ }
+ bytes_read += input.pos;
+ bytes_written += output.pos;
+ (*env)->SetLongField(env, this, ZStandardCompressor_bytesRead, bytes_read);
+ (*env)->SetLongField(env, this, ZStandardCompressor_bytesWritten, \
bytes_written); +
+ (*env)->SetIntField(env, this, ZStandardCompressor_uncompressedDirectBufOff, \
input.pos); + (*env)->SetIntField(env, this, \
ZStandardCompressor_uncompressedDirectBufLen, input.size - input.pos); + return \
(jint) output.pos; +}
+JNIEXPORT jstring JNICALL \
Java_org_apache_hadoop_io_compress_zstd_ZStandardCompressor_getLibraryName +(JNIEnv \
*env, jclass class) { +#ifdef UNIX
+ if (dlsym_ZSTD_isError) {
+ Dl_info dl_info;
+ if (dladdr( dlsym_ZSTD_isError, &dl_info)) {
+ return (*env)->NewStringUTF(env, dl_info.dli_fname);
+ }
+ }
+ return (*env)->NewStringUTF(env, HADOOP_ZSTD_LIBRARY);
+#endif
+#ifdef WINDOWS
+ LPWSTR filename = NULL;
+ GetLibraryName(dlsym_ZSTD_isError, &filename);
+ if (filename != NULL) {
+ return (*env)->NewString(env, filename, (jsize) wcslen(filename));
+ } else {
+ return (*env)->NewStringUTF(env, "Unavailable");
+ }
+#endif
+// returns the max size of the recommended input and output buffers
+JNIEXPORT jint JNICALL \
Java_org_apache_hadoop_io_compress_zstd_ZStandardCompressor_getStreamSize +(JNIEnv \
*env, jobject this) { + int x = (int) dlsym_ZSTD_CStreamInSize();
+ int y = (int) dlsym_ZSTD_CStreamOutSize();
+ return (x >= y) ? x : y;
+#endif //define HADOOP_ZSTD_LIBRARY
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db947fb8/hadoop-common-project/hado
\
op-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.c
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.c \
b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.c
new file mode 100644
index 0000000..1236756
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.c
@@ -0,0 +1,218 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+#include "org_apache_hadoop_io_compress_zstd.h"
+#if defined HADOOP_ZSTD_LIBRARY
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#ifdef UNIX
+#include <dlfcn.h>
+#include "config.h"
+#endif
+#include "org_apache_hadoop_io_compress_zstd_ZStandardDecompressor.h"
+static jfieldID ZStandardDecompressor_stream;
+static jfieldID ZStandardDecompressor_compressedDirectBufOff;
+static jfieldID ZStandardDecompressor_bytesInCompressedBuffer;
+static jfieldID ZStandardDecompressor_directBufferSize;
+static jfieldID ZStandardDecompressor_finished;
+static jfieldID ZStandardDecompressor_remaining;
+#ifdef UNIX
+static size_t (*dlsym_ZSTD_DStreamOutSize)(void);
+static size_t (*dlsym_ZSTD_DStreamInSize)(void);
+static ZSTD_DStream* (*dlsym_ZSTD_createDStream)(void);
+static size_t (*dlsym_ZSTD_initDStream)(ZSTD_DStream*);
+static size_t (*dlsym_ZSTD_freeDStream)(ZSTD_DStream*);
+static size_t (*dlsym_ZSTD_resetDStream)(ZSTD_DStream*);
+static size_t (*dlsym_ZSTD_decompressStream)(ZSTD_DStream*, ZSTD_outBuffer*, \
ZSTD_inBuffer*); +static size_t (*dlsym_ZSTD_flushStream)(ZSTD_CStream*, \
ZSTD_outBuffer*); +static unsigned (*dlsym_ZSTD_isError)(size_t);
+static const char * (*dlsym_ZSTD_getErrorName)(size_t);
+#endif
+#ifdef WINDOWS
+typedef size_t (__cdecl *__dlsym_ZSTD_DStreamOutSize)(void);
+typedef size_t (__cdecl *__dlsym_ZSTD_DStreamInSize)(void);
+typedef ZSTD_DStream* (__cdecl *__dlsym_ZSTD_createDStream)(void);
+typedef size_t (__cdecl *__dlsym_ZSTD_initDStream)(ZSTD_DStream*);
+typedef size_t (__cdecl *__dlsym_ZSTD_freeDStream)(ZSTD_DStream*);
+typedef size_t (__cdecl *__dlsym_ZSTD_resetDStream)(ZSTD_DStream*);
+typedef size_t (__cdecl *__dlsym_ZSTD_decompressStream)(ZSTD_DStream*, \
ZSTD_outBuffer*, ZSTD_inBuffer*); +typedef size_t (__cdecl \
*__dlsym_ZSTD_flushStream)(ZSTD_CStream*, ZSTD_outBuffer*); +typedef unsigned \
(__cdecl *__dlsym_ZSTD_isError)(size_t); +typedef const char * (__cdecl \
*__dlsym_ZSTD_getErrorName)(size_t); +
+static __dlsym_ZSTD_DStreamOutSize dlsym_ZSTD_DStreamOutSize;
+static __dlsym_ZSTD_DStreamInSize dlsym_ZSTD_DStreamInSize;
+static __dlsym_ZSTD_createDStream dlsym_ZSTD_createDStream;
+static __dlsym_ZSTD_initDStream dlsym_ZSTD_initDStream;
+static __dlsym_ZSTD_freeDStream dlsym_ZSTD_freeDStream;
+static __dlsym_ZSTD_resetDStream dlsym_ZSTD_resetDStream;
+static __dlsym_ZSTD_decompressStream dlsym_ZSTD_decompressStream;
+static __dlsym_ZSTD_isError dlsym_ZSTD_isError;
+static __dlsym_ZSTD_getErrorName dlsym_ZSTD_getErrorName;
+static __dlsym_ZSTD_flushStream dlsym_ZSTD_flushStream;
+#endif
+JNIEXPORT void JNICALL \
Java_org_apache_hadoop_io_compress_zstd_ZStandardDecompressor_initIDs (JNIEnv *env, \
jclass clazz) { + // Load libzstd.so
+#ifdef UNIX
+ void *libzstd = dlopen(HADOOP_ZSTD_LIBRARY, RTLD_LAZY | RTLD_GLOBAL);
+ if (!libzstd) {
+ char* msg = (char*)malloc(1000);
+ snprintf(msg, 1000, "%s (%s)!", "Cannot load " HADOOP_ZSTD_LIBRARY, \
dlerror()); + THROW(env, "java/lang/UnsatisfiedLinkError", msg);
+ return;
+ }
+#endif
+#ifdef WINDOWS
+ HMODULE libzstd = LoadLibrary(HADOOP_ZSTD_LIBRARY);
+ if (!libzstd) {
+ THROW(env, "java/lang/UnsatisfiedLinkError", "Cannot load zstd.dll");
+ return;
+ }
+#endif
+#ifdef UNIX
+ dlerror();
+ LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_DStreamOutSize, env, libzstd, \
"ZSTD_DStreamOutSize"); + LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_DStreamInSize, env, \
libzstd, "ZSTD_DStreamInSize"); + LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_createDStream, \
env, libzstd, "ZSTD_createDStream"); + LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_initDStream, \
env, libzstd, "ZSTD_initDStream"); + LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_freeDStream, \
env, libzstd, "ZSTD_freeDStream"); + LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_resetDStream, \
env, libzstd, "ZSTD_resetDStream"); + \
LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_decompressStream, env, libzstd, \
"ZSTD_decompressStream"); + LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_isError, env, libzstd, \
"ZSTD_isError"); + LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_getErrorName, env, libzstd, \
"ZSTD_getErrorName"); + LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_flushStream, env, libzstd, \
"ZSTD_flushStream"); +#endif
+#ifdef WINDOWS
+ LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_DStreamOutSize, dlsym_ZSTD_DStreamOutSize, env, \
libzstd, "ZSTD_DStreamOutSize"); + LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_DStreamInSize, \
dlsym_ZSTD_DStreamInSize, env, libzstd, "ZSTD_DStreamInSize"); + \
LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_createDStream, dlsym_ZSTD_createDStream, env, \
libzstd, "ZSTD_createDStream"); + LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_initDStream, \
dlsym_ZSTD_initDStream, env, libzstd, "ZSTD_initDStream"); + \
LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_freeDStream, dlsym_ZSTD_freeDStream, env, libzstd, \
"ZSTD_freeDStream"); + LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_resetDStream, \
dlsym_ZSTD_resetDStream, env, libzstd, "ZSTD_resetDStream"); + \
LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_decompressStream, dlsym_ZSTD_decompressStream, env, \
libzstd, "ZSTD_decompressStream"); + LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_isError, \
dlsym_ZSTD_isError, env, libzstd, "ZSTD_isError"); + \
LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_getErrorName, dlsym_ZSTD_getErrorName, env, libzstd, \
"ZSTD_getErrorName"); + LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_flushStream, \
dlsym_ZSTD_flushStream, env, libzstd, "ZSTD_flushStream"); +#endif
+ ZStandardDecompressor_stream = (*env)->GetFieldID(env, clazz, "stream", "J");
+ ZStandardDecompressor_finished = (*env)->GetFieldID(env, clazz, "finished", \
"Z"); + ZStandardDecompressor_compressedDirectBufOff = (*env)->GetFieldID(env, \
clazz, "compressedDirectBufOff", "I"); + \
ZStandardDecompressor_bytesInCompressedBuffer = (*env)->GetFieldID(env, clazz, \
"bytesInCompressedBuffer", "I"); + ZStandardDecompressor_directBufferSize = \
(*env)->GetFieldID(env, clazz, "directBufferSize", "I"); + \
ZStandardDecompressor_remaining = (*env)->GetFieldID(env, clazz, "remaining", "I"); \
+JNIEXPORT jlong JNICALL \
Java_org_apache_hadoop_io_compress_zstd_ZStandardDecompressor_create(JNIEnv *env, \
jobject this) { + ZSTD_DStream * stream = dlsym_ZSTD_createDStream();
+ if (stream == NULL) {
+ THROW(env, "java/lang/InternalError", "Error creating stream");
+ return (jlong) 0;
+ }
+ return (jlong) stream;
+JNIEXPORT void JNICALL \
Java_org_apache_hadoop_io_compress_zstd_ZStandardDecompressor_init(JNIEnv *env, \
jobject this, jlong stream) { + size_t result = \
dlsym_ZSTD_initDStream((ZSTD_DStream *) stream); + if (dlsym_ZSTD_isError(result)) \
{ + THROW(env, "java/lang/InternalError", dlsym_ZSTD_getErrorName(result));
+ return;
+ }
+ (*env)->SetLongField(env, this, ZStandardDecompressor_remaining, 0);
+JNIEXPORT void JNICALL \
Java_org_apache_hadoop_io_compress_zstd_ZStandardDecompressor_free(JNIEnv *env, \
jclass obj, jlong stream) { + size_t result = dlsym_ZSTD_freeDStream((ZSTD_DStream \
*) stream); + if (dlsym_ZSTD_isError(result)) {
+ THROW(env, "java/lang/InternalError", dlsym_ZSTD_getErrorName(result));
+ return;
+ }
+JNIEXPORT jint JNICALL \
Java_org_apache_hadoop_io_compress_zstd_ZStandardDecompressor_inflateBytesDirect \
+(JNIEnv *env, jobject this, jobject compressed_direct_buf, jint \
compressed_direct_buf_off, jint compressed_direct_buf_len, jobject \
uncompressed_direct_buf, jint uncompressed_direct_buf_off, jint \
uncompressed_direct_buf_len) { + ZSTD_DStream *stream = (ZSTD_DStream *) \
(*env)->GetLongField(env, this, ZStandardDecompressor_stream); + if (!stream) {
+ THROW(env, "java/lang/NullPointerException", NULL);
+ return (jint)0;
+ }
+ // Get the input direct buffer
+ void * compressed_bytes = (*env)->GetDirectBufferAddress(env, \
compressed_direct_buf); + if (!compressed_bytes) {
+ THROW(env, "java/lang/InternalError", "Undefined memory address for \
compressedDirectBuf"); + return (jint) 0;
+ }
+ // Get the output direct buffer
+ void * uncompressed_bytes = (*env)->GetDirectBufferAddress(env, \
uncompressed_direct_buf); + if (!uncompressed_bytes) {
+ THROW(env, "java/lang/InternalError", "Undefined memory address for \
uncompressedDirectBuf"); + return (jint) 0;
+ }
+ uncompressed_bytes = ((char*) uncompressed_bytes) + uncompressed_direct_buf_off;
+ ZSTD_inBuffer input = { compressed_bytes, compressed_direct_buf_len, \
compressed_direct_buf_off }; + ZSTD_outBuffer output = { uncompressed_bytes, \
uncompressed_direct_buf_len, 0 }; +
+ size_t const size = dlsym_ZSTD_decompressStream(stream, &output, &input);
+ // check for errors
+ if (dlsym_ZSTD_isError(size)) {
+ THROW(env, "java/lang/InternalError", dlsym_ZSTD_getErrorName(size));
+ return (jint) 0;
+ }
+ int remaining = input.size - input.pos;
+ (*env)->SetIntField(env, this, ZStandardDecompressor_remaining, remaining);
+ // the entire frame has been decoded
+ if (size == 0) {
+ (*env)->SetBooleanField(env, this, ZStandardDecompressor_finished, \
JNI_TRUE); + size_t result = dlsym_ZSTD_initDStream(stream);
+ if (dlsym_ZSTD_isError(result)) {
+ THROW(env, "java/lang/InternalError", dlsym_ZSTD_getErrorName(result));
+ return (jint) 0;
+ }
+ }
+ (*env)->SetIntField(env, this, ZStandardDecompressor_compressedDirectBufOff, \
input.pos); + (*env)->SetIntField(env, this, \
ZStandardDecompressor_bytesInCompressedBuffer, input.size); + return (jint) \
output.pos; +}
+// returns the max size of the recommended input and output buffers
+JNIEXPORT jint JNICALL \
Java_org_apache_hadoop_io_compress_zstd_ZStandardDecompressor_getStreamSize +(JNIEnv \
*env, jclass obj) { + int x = (int) dlsym_ZSTD_DStreamInSize();
+ int y = (int) dlsym_ZSTD_DStreamOutSize();
+ return (x >= y) ? x : y;
+#endif //define HADOOP_ZSTD_LIBRARY
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db947fb8/hadoop-common-project/hado
\
op-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/org_apache_hadoop_io_compress_zstd.h
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/org_apache_hadoop_io_compress_zstd.h \
b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/org_apache_hadoop_io_compress_zstd.h
new file mode 100644
index 0000000..78fc0a4
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/org_apache_hadoop_io_compress_zstd.h
@@ -0,0 +1,34 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+#ifndef ORG_APACHE_HADOOP_IO_COMPRESS_ZSTD_ZSTD_H
+#define ORG_APACHE_HADOOP_IO_COMPRESS_ZSTD_ZSTD_H
+#include "org_apache_hadoop.h"
+#ifdef UNIX
+#include <dlfcn.h>
+#endif
+#include <jni.h>
+#include <zstd.h>
+#include <stddef.h>
+#endif //ORG_APACHE_HADOOP_IO_COMPRESS_ZSTD_ZSTD_H
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db947fb8/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/util/NativeCodeLoader.c
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/util/NativeCodeLoader.c \
b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/util/NativeCodeLoader.c
index 3625112..704f40c 100644
--- a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/util/NativeCodeLoader.c
+++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/util/NativeCodeLoader.c
@@ -39,6 +39,16 @@ JNIEXPORT jboolean JNICALL \
Java_org_apache_hadoop_util_NativeCodeLoader_buildSup #endif
+JNIEXPORT jboolean JNICALL \
Java_org_apache_hadoop_util_NativeCodeLoader_buildSupportsZstd + (JNIEnv *env, \
jclass clazz) +{
+#ifdef HADOOP_ZSTD_LIBRARY
+ return JNI_TRUE;
+#else
+ return JNI_FALSE;
+#endif
JNIEXPORT jboolean JNICALL \
Java_org_apache_hadoop_util_NativeCodeLoader_buildSupportsOpenssl (JNIEnv *env, \
jclass clazz) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db947fb8/hadoop-common-project/hado
\
op-common/src/main/resources/META-INF/services/org.apache.hadoop.io.compress.CompressionCodec
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/resources/META-INF/services/org.apache.hadoop.io.compress.CompressionCodec \
b/hadoop-common-project/hadoop-common/src/main/resources/META-INF/services/org.apache.hadoop.io.compress.CompressionCodec
index df46e32..568972e 100644
--- a/hadoop-common-project/hadoop-common/src/main/resources/META-INF/services/org.apache.hadoop.io.compress.CompressionCodec
+++ b/hadoop-common-project/hadoop-common/src/main/resources/META-INF/services/org.apache.hadoop.io.compress.CompressionCodec
@@ -17,4 +17,4 @@ org.apache.hadoop.io.compress.DeflateCodec
org.apache.hadoop.io.compress.GzipCodec
org.apache.hadoop.io.compress.Lz4Codec
org.apache.hadoop.io.compress.SnappyCodec
+org.apache.hadoop.io.compress.ZStandardCodec
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db947fb8/hadoop-common-project/hadoop-common/src/site/markdown/NativeLibraries.md.vm
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/NativeLibraries.md.vm \
b/hadoop-common-project/hadoop-common/src/site/markdown/NativeLibraries.md.vm index \
04ff426..e4f720c 100644
--- a/hadoop-common-project/hadoop-common/src/site/markdown/NativeLibraries.md.vm
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/NativeLibraries.md.vm
@@ -118,6 +118,7 @@ NativeLibraryChecker is a tool to check whether native libraries \
are loaded corr hadoop: true /home/ozawa/hadoop/lib/native/libhadoop.so.1.0.0
zlib: true /lib/x86_64-linux-gnu/libz.so.1
snappy: true /usr/lib/libsnappy.so.1
+ zstd: true /usr/lib/libzstd.so.1
lz4: true revision:99
bzip2: false
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db947fb8/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java \
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java
index 1029517..4bb79a7 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java
@@ -75,6 +75,7 @@ import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
import static org.junit.Assert.*;
+import static org.junit.Assume.assumeTrue;
public class TestCodec {
@@ -519,6 +520,18 @@ public class TestCodec {
+ @Test(timeout=20000)
+ public void testSequenceFileZStandardCodec() throws Exception {
+ assumeTrue(ZStandardCodec.isNativeCodeLoaded());
+ Configuration conf = new Configuration();
+ sequenceFileCodecTest(conf, 0,
+ "org.apache.hadoop.io.compress.ZStandardCodec", 100);
+ sequenceFileCodecTest(conf, 100,
+ "org.apache.hadoop.io.compress.ZStandardCodec", 100);
+ sequenceFileCodecTest(conf, 200000,
+ "org.apache.hadoop.io.compress.ZStandardCodec", 1000000);
@Test
public void testSequenceFileDeflateCodec() throws IOException, \
ClassNotFoundException, InstantiationException, IllegalAccessException {
@@ -581,7 +594,7 @@ public class TestCodec {
@Test
public void testSnappyMapFile() throws Exception {
- Assume.assumeTrue(SnappyCodec.isNativeCodeLoaded());
+ assumeTrue(SnappyCodec.isNativeCodeLoaded());
codecTestMapFile(SnappyCodec.class, CompressionType.BLOCK, 100);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db947fb8/hadoop-common-project/hado
\
op-common/src/test/java/org/apache/hadoop/io/compress/TestCompressionStreamReuse.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCompressionStreamReuse.java \
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCompressionStreamReuse.java
index 2d75a2d..7b55cac 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCompressionStreamReuse.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCompressionStreamReuse.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.util.ReflectionUtils;
import org.junit.Test;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeTrue;
public class TestCompressionStreamReuse {
private static final Log LOG = LogFactory
@@ -69,6 +70,13 @@ public class TestCompressionStreamReuse {
"org.apache.hadoop.io.compress.GzipCodec");
+ @Test
+ public void testZStandardCompressStreamReuse() throws IOException {
+ assumeTrue(ZStandardCodec.isNativeCodeLoaded());
+ resetStateTest(conf, seed, count,
+ "org.apache.hadoop.io.compress.ZStandardCodec");
private void resetStateTest(Configuration conf, int seed, int count,
String codecClass) throws IOException {
// Create the codec
---------------------------------------------------------------------
To unsubscribe, e-mail:
[email protected]
For additional commands, e-mail:
[email protected]
[
prev in list
] [
next in list
] [
prev in thread
] [
next in thread
]
Configure
|
About
|