添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
相关文章推荐
叛逆的沙发  ·  Convert data ...·  2 天前    · 
开朗的烈酒  ·  向光而行 ...·  2 月前    · 
爱逃课的剪刀  ·  Job Search·  6 月前    · 
咆哮的墨镜  ·  Re: Illustrator ...·  6 月前    · 
List: hadoop-commits Subject: [2/2] hadoop git commit: HADOOP-13578. Add Codec for ZStandard Compression. Contributed by churro mo From: jlowe () apache ! org Date: 2017-01-04 16:20:37 Message-ID: abdc5385a1264731ba9dc4df4bfae0c9 () git ! apache ! org [Download RAW message or body ] HADOOP-13578. Add Codec for ZStandard Compression. Contributed by churro morales Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/db947fb8 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/db947fb8 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/db947fb8 Branch: refs/heads/branch-2 Commit: db947fb8704c2098a7d70216ba6d34e65b070379 Parents: 3207762 Author: Jason Lowe <[email protected]> Authored: Wed Jan 4 14:46:44 2017 +0000 Committer: Jason Lowe <[email protected]> Committed: Wed Jan 4 14:46:44 2017 +0000 ---------------------------------------------------------------------- BUILDING.txt | 24 + hadoop-common-project/hadoop-common/pom.xml | 21 + .../hadoop-common/src/CMakeLists.txt | 27 ++ .../hadoop-common/src/config.h.cmake | 1 + .../hadoop/fs/CommonConfigurationKeys.java | 16 + .../apache/hadoop/io/compress/Decompressor.java | 2 +- .../hadoop/io/compress/ZStandardCodec.java | 242 +++++++++ .../io/compress/zstd/ZStandardCompressor.java | 305 ++++++++++++ .../io/compress/zstd/ZStandardDecompressor.java | 323 ++++++++++++ .../hadoop/io/compress/zstd/package-info.java | 22 + .../apache/hadoop/util/NativeCodeLoader.java | 5 + .../hadoop/util/NativeLibraryChecker.java | 12 +- .../io/compress/zstd/ZStandardCompressor.c | 259 ++++++++++ .../io/compress/zstd/ZStandardDecompressor.c | 218 +++++++++ .../zstd/org_apache_hadoop_io_compress_zstd.h | 34 ++ .../org/apache/hadoop/util/NativeCodeLoader.c | 10 + ...g.apache.hadoop.io.compress.CompressionCodec | 2 +- .../src/site/markdown/NativeLibraries.md.vm | 1 + .../apache/hadoop/io/compress/TestCodec.java | 15 +- .../io/compress/TestCompressionStreamReuse.java | 8 + .../TestZStandardCompressorDecompressor.java | 485 +++++++++++++++++++ .../src/test/resources/zstd/test_file.txt | 71 +++ .../src/test/resources/zstd/test_file.txt.zst | Bin 0 -> 3690 bytes hadoop-project-dist/pom.xml | 6 + hadoop-project/pom.xml | 2 + 25 files changed, 2107 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/db947fb8/BUILDING.txt ---------------------------------------------------------------------- diff --git a/BUILDING.txt b/BUILDING.txt index 2f1f9f8..e596504 100644 --- a/BUILDING.txt +++ b/BUILDING.txt @@ -77,6 +77,8 @@ Optional packages: $ sudo apt-get install libjansson-dev * Linux FUSE $ sudo apt-get install fuse libfuse-dev +* ZStandard compression + $ sudo apt-get install zstd ---------------------------------------------------------------------------------- Maven main modules: @@ -148,6 +150,28 @@ Maven build goals: and it ignores the -Dsnappy.prefix option. If -Dsnappy.lib isn't given, the bundling and building will fail. + ZStandard build options: + ZStandard is a compression library that can be utilized by the native code. + It is currently an optional component, meaning that Hadoop can be built with + or without this dependency. + * Use -Drequire.zstd to fail the build if libzstd.so is not found. + If this option is not specified and the zstd library is missing. + * Use -Dzstd.prefix to specify a nonstandard location for the libzstd + header files and library files. You do not need this option if you have + installed zstandard using a package manager. + * Use -Dzstd.lib to specify a nonstandard location for the libzstd library + files. Similarly to zstd.prefix, you do not need this option if you have + installed using a package manager. + * Use -Dbundle.zstd to copy the contents of the zstd.lib directory into + the final tar file. This option requires that -Dzstd.lib is also given, + and it ignores the -Dzstd.prefix option. If -Dzstd.lib isn't given, the + bundling and building will fail. OpenSSL build options: OpenSSL includes a crypto library that can be utilized by the native code. http://git-wip-us.apache.org/repos/asf/hadoop/blob/db947fb8/hadoop-common-project/hadoop-common/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/pom.xml \ b/hadoop-common-project/hadoop-common/pom.xml index ddc85b8..ee7acb3 100644 --- a/hadoop-common-project/hadoop-common/pom.xml +++ b/hadoop-common-project/hadoop-common/pom.xml @@ -514,6 +514,10 @@ <snappy.lib></snappy.lib> <snappy.include></snappy.include> <require.snappy>false</require.snappy> + <zstd.prefix></zstd.prefix> + <zstd.lib></zstd.lib> + <zstd.include></zstd.include> + <require.zstd>false</require.zstd> <openssl.prefix></openssl.prefix> <openssl.lib></openssl.lib> <openssl.include></openssl.include> @@ -568,6 +572,8 @@ <javahClassName>org.apache.hadoop.security.JniBasedUnixGroupsNetgroupMapping</javahClassName> <javahClassName>org.apache.hadoop.io.compress.snappy.SnappyCompressor</javahClassName> <javahClassName>org.apache.hadoop.io.compress.snappy.SnappyDecompressor</javahClassName> + \ <javahClassName>org.apache.hadoop.io.compress.zstd.ZStandardCompressor</javahClassName> + \ <javahClassName>org.apache.hadoop.io.compress.zstd.ZStandardDecompressor</javahClassName> <javahClassName>org.apache.hadoop.io.compress.lz4.Lz4Compressor</javahClassName> <javahClassName>org.apache.hadoop.io.compress.lz4.Lz4Decompressor</javahClassName> <javahClassName>org.apache.hadoop.crypto.OpensslCipher</javahClassName> @@ -599,6 \ +605,10 @@ <CUSTOM_SNAPPY_PREFIX>${snappy.prefix}</CUSTOM_SNAPPY_PREFIX> <CUSTOM_SNAPPY_LIB>${snappy.lib} </CUSTOM_SNAPPY_LIB> <CUSTOM_SNAPPY_INCLUDE>${snappy.include} \ </CUSTOM_SNAPPY_INCLUDE> + \ <REQUIRE_ZSTD>${require.zstd}</REQUIRE_ZSTD> + \ <CUSTOM_ZSTD_PREFIX>${zstd.prefix}</CUSTOM_ZSTD_PREFIX> + \ <CUSTOM_ZSTD_LIB>${zstd.lib}</CUSTOM_ZSTD_LIB> + \ <CUSTOM_ZSTD_INCLUDE>${zstd.include}</CUSTOM_ZSTD_INCLUDE> <REQUIRE_OPENSSL>${require.openssl} </REQUIRE_OPENSSL> <CUSTOM_OPENSSL_PREFIX>${openssl.prefix} \ </CUSTOM_OPENSSL_PREFIX> <CUSTOM_OPENSSL_LIB>${openssl.lib} </CUSTOM_OPENSSL_LIB> @@ -636,6 +646,11 @@ <snappy.include></snappy.include> <require.snappy>false</require.snappy> <bundle.snappy.in.bin>true</bundle.snappy.in.bin> + <zstd.prefix></zstd.prefix> + <zstd.lib></zstd.lib> + <zstd.include></zstd.include> + <require.ztsd>false</require.ztsd> + <bundle.zstd.in.bin>true</bundle.zstd.in.bin> <openssl.prefix></openssl.prefix> <openssl.lib></openssl.lib> <openssl.include></openssl.include> @@ -685,6 +700,8 @@ <javahClassName>org.apache.hadoop.security.JniBasedUnixGroupsNetgroupMapping</javahClassName> <javahClassName>org.apache.hadoop.io.compress.snappy.SnappyCompressor</javahClassName> <javahClassName>org.apache.hadoop.io.compress.snappy.SnappyDecompressor</javahClassName> + \ <javahClassName>org.apache.hadoop.io.compress.zstd.ZStandardCompressor</javahClassName> + \ <javahClassName>org.apache.hadoop.io.compress.zstd.ZStandardDecompressor</javahClassName> <javahClassName>org.apache.hadoop.io.compress.lz4.Lz4Compressor</javahClassName> <javahClassName>org.apache.hadoop.io.compress.lz4.Lz4Decompressor</javahClassName> <javahClassName>org.apache.hadoop.crypto.OpensslCipher</javahClassName> @@ -736,6 \ +753,10 @@ <argument>/p:CustomSnappyLib=${snappy.lib}</argument> <argument>/p:CustomSnappyInclude=${snappy.include}</argument> <argument>/p:RequireSnappy=${require.snappy}</argument> + <argument>/p:CustomZstdPrefix=${zstd.prefix}</argument> + <argument>/p:CustomZstdLib=${zstd.lib}</argument> + <argument>/p:CustomZstdInclude=${zstd.include}</argument> + <argument>/p:RequireZstd=${require.ztsd}</argument> <argument>/p:CustomOpensslPrefix=${openssl.prefix}</argument> <argument>/p:CustomOpensslLib=${openssl.lib}</argument> <argument>/p:CustomOpensslInclude=${openssl.include}</argument> http://git-wip-us.apache.org/repos/asf/hadoop/blob/db947fb8/hadoop-common-project/hadoop-common/src/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/CMakeLists.txt \ b/hadoop-common-project/hadoop-common/src/CMakeLists.txt index c93bfe7..3e52643 \ 100644 --- a/hadoop-common-project/hadoop-common/src/CMakeLists.txt +++ b/hadoop-common-project/hadoop-common/src/CMakeLists.txt @@ -94,6 +94,31 @@ else() endif() endif() +# Require Zstandard +SET(STORED_CMAKE_FIND_LIBRARY_SUFFIXES ${CMAKE_FIND_LIBRARY_SUFFIXES}) +hadoop_set_find_shared_library_version("1") +find_library(ZSTD_LIBRARY + NAMES zstd + PATHS ${CUSTOM_ZSTD_PREFIX} ${CUSTOM_ZSTD_PREFIX}/lib + ${CUSTOM_ZSTD_PREFIX}/lib64 ${CUSTOM_ZSTD_LIB}) +SET(CMAKE_FIND_LIBRARY_SUFFIXES ${STORED_CMAKE_FIND_LIBRARY_SUFFIXES}) +find_path(ZSTD_INCLUDE_DIR + NAMES zstd.h + PATHS ${CUSTOM_ZSTD_PREFIX} ${CUSTOM_ZSTD_PREFIX}/include + ${CUSTOM_ZSTD_INCLUDE}) +if (ZSTD_LIBRARY AND ZSTD_INCLUDE_DIR) + GET_FILENAME_COMPONENT(HADOOP_ZSTD_LIBRARY ${ZSTD_LIBRARY} NAME) + set(ZSTD_SOURCE_FILES + "${SRC}/io/compress/zstd/ZStandardCompressor.c" + "${SRC}/io/compress/zstd/ZStandardDecompressor.c") +else (ZSTD_LIBRARY AND ZSTD_INCLUDE_DIR) + set(ZSTD_INCLUDE_DIR "") + set(ZSTD_SOURCE_FILES "") + IF(REQUIRE_ZSTD) + MESSAGE(FATAL_ERROR "Required zstandard library could not be found. \ ZSTD_LIBRARY=${ZSTD_LIBRARY}, ZSTD_INCLUDE_DIR=${ZSTD_INCLUDE_DIR}, \ CUSTOM_ZSTD_INCLUDE_DIR=${CUSTOM_ZSTD_INCLUDE_DIR}, \ CUSTOM_ZSTD_PREFIX=${CUSTOM_ZSTD_PREFIX}, \ CUSTOM_ZSTD_INCLUDE=${CUSTOM_ZSTD_INCLUDE}") + ENDIF(REQUIRE_ZSTD) +endif (ZSTD_LIBRARY AND ZSTD_INCLUDE_DIR) # Build hardware CRC32 acceleration, if supported on the platform. if(CMAKE_SYSTEM_PROCESSOR MATCHES "^i.86$" OR CMAKE_SYSTEM_PROCESSOR STREQUAL \ "x86_64" OR CMAKE_SYSTEM_PROCESSOR STREQUAL "amd64") set(BULK_CRC_ARCH_SOURCE_FIlE \ "${SRC}/util/bulk_crc32_x86.c") @@ -169,6 +194,7 @@ include_directories( ${ZLIB_INCLUDE_DIRS} ${BZIP2_INCLUDE_DIR} ${SNAPPY_INCLUDE_DIR} + ${ZSTD_INCLUDE_DIR} ${OPENSSL_INCLUDE_DIR} ${SRC}/util @@ -182,6 +208,7 @@ hadoop_add_dual_library(hadoop ${SRC}/io/compress/lz4/lz4.c ${SRC}/io/compress/lz4/lz4hc.c ${SNAPPY_SOURCE_FILES} + ${ZSTD_SOURCE_FILES} ${OPENSSL_SOURCE_FILES} ${SRC}/io/compress/zlib/ZlibCompressor.c ${SRC}/io/compress/zlib/ZlibDecompressor.c http://git-wip-us.apache.org/repos/asf/hadoop/blob/db947fb8/hadoop-common-project/hadoop-common/src/config.h.cmake ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/config.h.cmake \ b/hadoop-common-project/hadoop-common/src/config.h.cmake index d71271d..f2e1586 \ 100644 --- a/hadoop-common-project/hadoop-common/src/config.h.cmake +++ b/hadoop-common-project/hadoop-common/src/config.h.cmake @@ -21,6 +21,7 @@ #cmakedefine HADOOP_ZLIB_LIBRARY "@HADOOP_ZLIB_LIBRARY@" #cmakedefine HADOOP_BZIP2_LIBRARY "@HADOOP_BZIP2_LIBRARY@" #cmakedefine HADOOP_SNAPPY_LIBRARY "@HADOOP_SNAPPY_LIBRARY@" +#cmakedefine HADOOP_ZSTD_LIBRARY "@HADOOP_ZSTD_LIBRARY@" #cmakedefine HADOOP_OPENSSL_LIBRARY "@HADOOP_OPENSSL_LIBRARY@" #cmakedefine HAVE_SYNC_FILE_RANGE #cmakedefine HAVE_POSIX_FADVISE http://git-wip-us.apache.org/repos/asf/hadoop/blob/db947fb8/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java \ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java index ac634ab..49062ea 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java @@ -141,6 +141,22 @@ public class CommonConfigurationKeys extends \ CommonConfigurationKeysPublic { public static final int \ IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT = 256 * 1024; + /** ZStandard compression level. */ + public static final String IO_COMPRESSION_CODEC_ZSTD_LEVEL_KEY = + "io.compression.codec.zstd.level"; + /** Default value for IO_COMPRESSION_CODEC_ZSTD_LEVEL_KEY. */ + public static final int IO_COMPRESSION_CODEC_ZSTD_LEVEL_DEFAULT = 3; + /** ZStandard buffer size. */ + public static final String IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_KEY = + "io.compression.codec.zstd.buffersize"; + /** ZStandard buffer size a value of 0 means use the recommended zstd + * buffer size that the library recommends. */ + public static final int + IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT = 0; /** Internal buffer size for Lz4 compressor/decompressors */ public static final String IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY = "io.compression.codec.lz4.buffersize"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/db947fb8/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Decompressor.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Decompressor.java \ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Decompressor.java index 8cb0b2a..3808003 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Decompressor.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Decompressor.java @@ -95,7 +95,7 @@ public interface Decompressor { * @param b Buffer for the compressed data * @param off Start offset of the data * @param len Size of the buffer - * @return The actual number of bytes of compressed data. + * @return The actual number of bytes of uncompressed data. * @throws IOException public int decompress(byte[] b, int off, int len) throws IOException; http://git-wip-us.apache.org/repos/asf/hadoop/blob/db947fb8/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/ZStandardCodec.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/ZStandardCodec.java \ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/ZStandardCodec.java new file mode 100644 index 0000000..11e98a1 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/ZStandardCodec.java @@ -0,0 +1,242 @@ + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +package org.apache.hadoop.io.compress; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.io.compress.zstd.ZStandardCompressor; +import org.apache.hadoop.io.compress.zstd.ZStandardDecompressor; +import org.apache.hadoop.util.NativeCodeLoader; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import static org.apache.hadoop.fs.CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT; +import static org.apache.hadoop.fs.CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_KEY; + * This class creates zstd compressors/decompressors. +public class ZStandardCodec implements + Configurable, CompressionCodec, DirectDecompressionCodec { + private Configuration conf; + /** + * Set the configuration to be used by this object. + * + * @param conf the configuration object. + */ + @Override + public void setConf(Configuration conf) { + this.conf = conf; + /** + * Return the configuration used by this object. + * + * @return the configuration object used by this object. + */ + @Override + public Configuration getConf() { + return conf; + public static void checkNativeCodeLoaded() { + if (!NativeCodeLoader.isNativeCodeLoaded() || + !NativeCodeLoader.buildSupportsZstd()) { + throw new RuntimeException("native zStandard library " + + "not available: this version of libhadoop was built " + + "without zstd support."); + } + if (!ZStandardCompressor.isNativeCodeLoaded()) { + throw new RuntimeException("native zStandard library not " + + "available: ZStandardCompressor has not been loaded."); + } + if (!ZStandardDecompressor.isNativeCodeLoaded()) { + throw new RuntimeException("native zStandard library not " + + "available: ZStandardDecompressor has not been loaded."); + } + public static boolean isNativeCodeLoaded() { + return ZStandardCompressor.isNativeCodeLoaded() + && ZStandardDecompressor.isNativeCodeLoaded(); + public static String getLibraryName() { + return ZStandardCompressor.getLibraryName(); + public static int getCompressionLevel(Configuration conf) { + return conf.getInt( + CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_LEVEL_KEY, + CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_LEVEL_DEFAULT); + public static int getCompressionBufferSize(Configuration conf) { + int bufferSize = getBufferSize(conf); + return bufferSize == 0 ? + ZStandardCompressor.getRecommendedBufferSize() : + bufferSize; + public static int getDecompressionBufferSize(Configuration conf) { + int bufferSize = getBufferSize(conf); + return bufferSize == 0 ? + ZStandardDecompressor.getRecommendedBufferSize() : + bufferSize; + private static int getBufferSize(Configuration conf) { + return conf.getInt(IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_KEY, + IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT); + /** + * Create a {@link CompressionOutputStream} that will write to the given + * {@link OutputStream}. + * + * @param out the location for the final output stream + * @return a stream the user can write uncompressed data to have compressed + * @throws IOException + */ + @Override + public CompressionOutputStream createOutputStream(OutputStream out) + throws IOException { + return Util. + createOutputStreamWithCodecPool(this, conf, out); + /** + * Create a {@link CompressionOutputStream} that will write to the given + * {@link OutputStream} with the given {@link Compressor}. + * + * @param out the location for the final output stream + * @param compressor compressor to use + * @return a stream the user can write uncompressed data to have compressed + * @throws IOException + */ + @Override + public CompressionOutputStream createOutputStream(OutputStream out, + Compressor compressor) + throws IOException { + checkNativeCodeLoaded(); + return new CompressorStream(out, compressor, + getCompressionBufferSize(conf)); + /** + * Get the type of {@link Compressor} needed by this {@link CompressionCodec}. + * + * @return the type of compressor needed by this codec. + */ + @Override + public Class<? extends Compressor> getCompressorType() { + checkNativeCodeLoaded(); + return ZStandardCompressor.class; + /** + * Create a new {@link Compressor} for use by this {@link CompressionCodec}. + * + * @return a new compressor for use by this codec + */ + @Override + public Compressor createCompressor() { + checkNativeCodeLoaded(); + return new ZStandardCompressor( + getCompressionLevel(conf), getCompressionBufferSize(conf)); + /** + * Create a {@link CompressionInputStream} that will read from the given + * input stream. + * + * @param in the stream to read compressed bytes from + * @return a stream to read uncompressed bytes from + * @throws IOException + */ + @Override + public CompressionInputStream createInputStream(InputStream in) + throws IOException { + return Util. + createInputStreamWithCodecPool(this, conf, in); + /** + * Create a {@link CompressionInputStream} that will read from the given + * {@link InputStream} with the given {@link Decompressor}. + * + * @param in the stream to read compressed bytes from + * @param decompressor decompressor to use + * @return a stream to read uncompressed bytes from + * @throws IOException + */ + @Override + public CompressionInputStream createInputStream(InputStream in, + Decompressor decompressor) + throws IOException { + checkNativeCodeLoaded(); + return new DecompressorStream(in, decompressor, + getDecompressionBufferSize(conf)); + /** + * Get the type of {@link Decompressor} needed by + * this {@link CompressionCodec}. + * + * @return the type of decompressor needed by this codec. + */ + @Override + public Class<? extends Decompressor> getDecompressorType() { + checkNativeCodeLoaded(); + return ZStandardDecompressor.class; + /** + * Create a new {@link Decompressor} for use by this {@link CompressionCodec}. + * + * @return a new decompressor for use by this codec + */ + @Override + public Decompressor createDecompressor() { + checkNativeCodeLoaded(); + return new ZStandardDecompressor(getDecompressionBufferSize(conf)); + /** + * Get the default filename extension for this kind of compression. + * + * @return <code>.zst</code>. + */ + @Override + public String getDefaultExtension() { + return ".zst"; + @Override + public DirectDecompressor createDirectDecompressor() { + return new ZStandardDecompressor.ZStandardDirectDecompressor( + getDecompressionBufferSize(conf) + ); http://git-wip-us.apache.org/repos/asf/hadoop/blob/db947fb8/hadoop-common-project/hado \ op-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.java \ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.java new file mode 100644 index 0000000..eb2121a --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.java @@ -0,0 +1,305 @@ + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +package org.apache.hadoop.io.compress.zstd; +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.io.compress.Compressor; +import org.apache.hadoop.io.compress.ZStandardCodec; +import org.apache.hadoop.util.NativeCodeLoader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.nio.ByteBuffer; + * A {@link Compressor} based on the zStandard compression algorithm. + * https://github.com/facebook/zstd +public class ZStandardCompressor implements Compressor { + private static final Logger LOG = + LoggerFactory.getLogger(ZStandardCompressor.class); + private long stream; + private int level; + private int directBufferSize; + private byte[] userBuf = null; + private int userBufOff = 0, userBufLen = 0; + private ByteBuffer uncompressedDirectBuf = null; + private int uncompressedDirectBufOff = 0, uncompressedDirectBufLen = 0; + private boolean keepUncompressedBuf = false; + private ByteBuffer compressedDirectBuf = null; + private boolean finish, finished; + private long bytesRead = 0; + private long bytesWritten = 0; + private static boolean nativeZStandardLoaded = false; + static { + if (NativeCodeLoader.isNativeCodeLoaded()) { + try { + // Initialize the native library + initIDs(); + nativeZStandardLoaded = true; + } catch (Throwable t) { + LOG.warn("Error loading zstandard native libraries: " + t); + } + } + public static boolean isNativeCodeLoaded() { + return nativeZStandardLoaded; + public static int getRecommendedBufferSize() { + return getStreamSize(); + @VisibleForTesting + ZStandardCompressor() { + this(CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_LEVEL_DEFAULT, + CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT); + /** + * Creates a new compressor with the default compression level. + * Compressed data will be generated in ZStandard format. + */ + public ZStandardCompressor(int level, int bufferSize) { + this(level, bufferSize, bufferSize); + @VisibleForTesting + ZStandardCompressor(int level, int inputBufferSize, int outputBufferSize) { + this.level = level; + stream = create(); + this.directBufferSize = outputBufferSize; + uncompressedDirectBuf = ByteBuffer.allocateDirect(inputBufferSize); + compressedDirectBuf = ByteBuffer.allocateDirect(outputBufferSize); + compressedDirectBuf.position(outputBufferSize); + reset(); + /** + * Prepare the compressor to be used in a new stream with settings defined in + * the given Configuration. It will reset the compressor's compression level + * and compression strategy. + * + * @param conf Configuration storing new settings + */ + @Override + public void reinit(Configuration conf) { + if (conf == null) { + return; + } + level = ZStandardCodec.getCompressionLevel(conf); + reset(); + LOG.debug("Reinit compressor with new compression configuration"); + @Override + public void setInput(byte[] b, int off, int len) { + if (b == null) { + throw new NullPointerException(); + } + if (off < 0 || len < 0 || off > b.length - len) { + throw new ArrayIndexOutOfBoundsException(); + } + this.userBuf = b; + this.userBufOff = off; + this.userBufLen = len; + uncompressedDirectBufOff = 0; + setInputFromSavedData(); + compressedDirectBuf.limit(directBufferSize); + compressedDirectBuf.position(directBufferSize); + //copy enough data from userBuf to uncompressedDirectBuf + private void setInputFromSavedData() { + int len = Math.min(userBufLen, uncompressedDirectBuf.remaining()); + uncompressedDirectBuf.put(userBuf, userBufOff, len); + userBufLen -= len; + userBufOff += len; + uncompressedDirectBufLen = uncompressedDirectBuf.position(); + @Override + public void setDictionary(byte[] b, int off, int len) { + throw new UnsupportedOperationException( + "Dictionary support is not enabled"); + @Override + public boolean needsInput() { + // Consume remaining compressed data? + if (compressedDirectBuf.remaining() > 0) { + return false; + } + // have we consumed all input + if (keepUncompressedBuf && uncompressedDirectBufLen > 0) { + return false; + } + if (uncompressedDirectBuf.remaining() > 0) { + // Check if we have consumed all user-input + if (userBufLen <= 0) { + return true; + } else { + // copy enough data from userBuf to uncompressedDirectBuf + setInputFromSavedData(); + // uncompressedDirectBuf is not full + return uncompressedDirectBuf.remaining() > 0; + } + } + return false; + @Override + public void finish() { + finish = true; + @Override + public boolean finished() { + // Check if 'zstd' says its 'finished' and all compressed + // data has been consumed + return (finished && compressedDirectBuf.remaining() == 0); + @Override + public int compress(byte[] b, int off, int len) throws IOException { + checkStream(); + if (b == null) { + throw new NullPointerException(); + } + if (off < 0 || len < 0 || off > b.length - len) { + throw new ArrayIndexOutOfBoundsException(); + } + // Check if there is compressed data + int n = compressedDirectBuf.remaining(); + if (n > 0) { + n = Math.min(n, len); + compressedDirectBuf.get(b, off, n); + return n; + } + // Re-initialize the output direct buffer + compressedDirectBuf.rewind(); + compressedDirectBuf.limit(directBufferSize); + // Compress data + n = deflateBytesDirect( + uncompressedDirectBuf, + uncompressedDirectBufOff, + uncompressedDirectBufLen, + compressedDirectBuf, + directBufferSize + ); + compressedDirectBuf.limit(n); + // Check if we have consumed all input buffer + if (uncompressedDirectBufLen <= 0) { + // consumed all input buffer + keepUncompressedBuf = false; + uncompressedDirectBuf.clear(); + uncompressedDirectBufOff = 0; + uncompressedDirectBufLen = 0; + } else { + // did not consume all input buffer + keepUncompressedBuf = true; + } + // Get at most 'len' bytes + n = Math.min(n, len); + compressedDirectBuf.get(b, off, n); + return n; + /** + * Returns the total number of compressed bytes output so far. + * + * @return the total (non-negative) number of compressed bytes output so far + */ + @Override + public long getBytesWritten() { + checkStream(); + return bytesWritten; + /** + * <p>Returns the total number of uncompressed bytes input so far.</p> + * + * @return the total (non-negative) number of uncompressed bytes input so far + */ + @Override + public long getBytesRead() { + checkStream(); + return bytesRead; + @Override + public void reset() { + checkStream(); + init(level, stream); + finish = false; + finished = false; + bytesRead = 0; + bytesWritten = 0; + uncompressedDirectBuf.rewind(); + uncompressedDirectBufOff = 0; + uncompressedDirectBufLen = 0; + keepUncompressedBuf = false; + compressedDirectBuf.limit(directBufferSize); + compressedDirectBuf.position(directBufferSize); + userBufOff = 0; + userBufLen = 0; + @Override + public void end() { + if (stream != 0) { + end(stream); + stream = 0; + } + private void checkStream() { + if (stream == 0) { + throw new NullPointerException(); + } + private native static long create(); + private native static void init(int level, long stream); + private native int deflateBytesDirect(ByteBuffer src, int srcOffset, + int srcLen, ByteBuffer dst, int dstLen); + private static native int getStreamSize(); + private native static void end(long strm); + private native static void initIDs(); + public native static String getLibraryName(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/db947fb8/hadoop-common-project/hado \ op-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.java \ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.java new file mode 100644 index 0000000..73d73e1 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.java @@ -0,0 +1,323 @@ + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +package org.apache.hadoop.io.compress.zstd; +import org.apache.hadoop.io.compress.Decompressor; +import org.apache.hadoop.io.compress.DirectDecompressor; +import org.apache.hadoop.util.NativeCodeLoader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.nio.ByteBuffer; + * A {@link Decompressor} based on the zStandard compression algorithm. + * https://github.com/facebook/zstd +public class ZStandardDecompressor implements Decompressor { + private static final Logger LOG = + LoggerFactory.getLogger(ZStandardDecompressor.class); + private long stream; + private int directBufferSize; + private ByteBuffer compressedDirectBuf = null; + private int compressedDirectBufOff, bytesInCompressedBuffer; + private ByteBuffer uncompressedDirectBuf = null; + private byte[] userBuf = null; + private int userBufOff = 0, userBufferBytesToConsume = 0; + private boolean finished; + private int remaining = 0; + private static boolean nativeZStandardLoaded = false; + static { + if (NativeCodeLoader.isNativeCodeLoaded()) { + try { + // Initialize the native library + initIDs(); + nativeZStandardLoaded = true; + } catch (Throwable t) { + LOG.warn("Error loading zstandard native libraries: " + t); + } + } + public static boolean isNativeCodeLoaded() { + return nativeZStandardLoaded; + public static int getRecommendedBufferSize() { + return getStreamSize(); + public ZStandardDecompressor() { + this(getStreamSize()); + /** + * Creates a new decompressor. + */ + public ZStandardDecompressor(int bufferSize) { + this.directBufferSize = bufferSize; + compressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize); + uncompressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize); + uncompressedDirectBuf.position(directBufferSize); + stream = create(); + reset(); + @Override + public void setInput(byte[] b, int off, int len) { + if (b == null) { + throw new NullPointerException(); + } + if (off < 0 || len < 0 || off > b.length - len) { + throw new ArrayIndexOutOfBoundsException(); + } + this.userBuf = b; + this.userBufOff = off; + this.userBufferBytesToConsume = len; + setInputFromSavedData(); + uncompressedDirectBuf.limit(directBufferSize); + uncompressedDirectBuf.position(directBufferSize); + private void setInputFromSavedData() { + compressedDirectBufOff = 0; + bytesInCompressedBuffer = userBufferBytesToConsume; + if (bytesInCompressedBuffer > directBufferSize) { + bytesInCompressedBuffer = directBufferSize; + } + compressedDirectBuf.rewind(); + compressedDirectBuf.put( + userBuf, userBufOff, bytesInCompressedBuffer); + userBufOff += bytesInCompressedBuffer; + userBufferBytesToConsume -= bytesInCompressedBuffer; + // dictionary is not supported + @Override + public void setDictionary(byte[] b, int off, int len) { + throw new UnsupportedOperationException( + "Dictionary support is not enabled"); + @Override + public boolean needsInput() { + // Consume remaining compressed data? + if (uncompressedDirectBuf.remaining() > 0) { + return false; + } + // Check if we have consumed all input + if (bytesInCompressedBuffer - compressedDirectBufOff <= 0) { + // Check if we have consumed all user-input + if (userBufferBytesToConsume <= 0) { + return true; + } else { + setInputFromSavedData(); + } + } + return false; + // dictionary is not supported. + @Override + public boolean needsDictionary() { + return false; + @Override + public boolean finished() { + // finished == true if ZSTD_decompressStream() returns 0 + // also check we have nothing left in our buffer + return (finished && uncompressedDirectBuf.remaining() == 0); + @Override + public int decompress(byte[] b, int off, int len) + throws IOException { + checkStream(); + if (b == null) { + throw new NullPointerException(); + } + if (off < 0 || len < 0 || off > b.length - len) { + throw new ArrayIndexOutOfBoundsException(); + } + // Check if there is uncompressed data + int n = uncompressedDirectBuf.remaining(); + if (n > 0) { + return populateUncompressedBuffer(b, off, len, n); + } + // Re-initialize the output direct buffer + uncompressedDirectBuf.rewind(); + uncompressedDirectBuf.limit(directBufferSize); + // Decompress data + n = inflateBytesDirect( + compressedDirectBuf, + compressedDirectBufOff, + bytesInCompressedBuffer, + uncompressedDirectBuf, + 0, + directBufferSize + ); + uncompressedDirectBuf.limit(n); + // Get at most 'len' bytes + return populateUncompressedBuffer(b, off, len, n); + /** + * <p>Returns the number of bytes remaining in the input buffers; + * normally called when finished() is true to determine amount of post-stream + * data.</p> + * + * @return the total (non-negative) number of unprocessed bytes in input + */ + @Override + public int getRemaining() { + checkStream(); + // userBuf + compressedDirectBuf + return userBufferBytesToConsume + remaining; + /** + * Resets everything including the input buffers (user and direct). + */ + @Override + public void reset() { + checkStream(); + init(stream); + remaining = 0; + finished = false; + compressedDirectBufOff = 0; + bytesInCompressedBuffer = 0; + uncompressedDirectBuf.limit(directBufferSize); + uncompressedDirectBuf.position(directBufferSize); + userBufOff = 0; + userBufferBytesToConsume = 0; + @Override + public void end() { + if (stream != 0) { + free(stream); + stream = 0; + } + @Override + protected void finalize() { + reset(); + private void checkStream() { + if (stream == 0) { + throw new NullPointerException("Stream not initialized"); + } + private int populateUncompressedBuffer(byte[] b, int off, int len, int n) { + n = Math.min(n, len); + uncompressedDirectBuf.get(b, off, n); + return n; + private native static void initIDs(); + private native static long create(); + private native static void init(long stream); + private native int inflateBytesDirect(ByteBuffer src, int srcOffset, + int srcLen, ByteBuffer dst, int dstOffset, int dstLen); + private native static void free(long strm); + private native static int getStreamSize(); + int inflateDirect(ByteBuffer src, ByteBuffer dst) throws IOException { + assert + (this instanceof ZStandardDecompressor.ZStandardDirectDecompressor); + int originalPosition = dst.position(); + int n = inflateBytesDirect( + src, src.position(), src.remaining(), dst, dst.position(), + dst.remaining() + ); + dst.position(originalPosition + n); + if (bytesInCompressedBuffer > 0) { + src.position(compressedDirectBufOff); + } else { + src.position(src.limit()); + } + return n; + /** + * A {@link DirectDecompressor} for ZStandard + * https://github.com/facebook/zstd . + */ + public static class ZStandardDirectDecompressor + extends ZStandardDecompressor implements DirectDecompressor { + public ZStandardDirectDecompressor(int directBufferSize) { + super(directBufferSize); + } + @Override + public boolean finished() { + return (endOfInput && super.finished()); + } + @Override + public void reset() { + super.reset(); + endOfInput = true; + } + private boolean endOfInput; + @Override + public void decompress(ByteBuffer src, ByteBuffer dst) + throws IOException { + assert dst.isDirect() : "dst.isDirect()"; + assert src.isDirect() : "src.isDirect()"; + assert dst.remaining() > 0 : "dst.remaining() > 0"; + this.inflateDirect(src, dst); + endOfInput = !src.hasRemaining(); + } + @Override + public void setDictionary(byte[] b, int off, int len) { + throw new UnsupportedOperationException( + "byte[] arrays are not supported for DirectDecompressor"); + } + @Override + public int decompress(byte[] b, int off, int len) { + throw new UnsupportedOperationException( + "byte[] arrays are not supported for DirectDecompressor"); + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/db947fb8/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/package-info.java \ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/package-info.java new file mode 100644 index 0000000..9069070 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/package-info.java @@ -0,0 +1,22 @@ + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. [email protected] [email protected] +package org.apache.hadoop.io.compress.zstd; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; http://git-wip-us.apache.org/repos/asf/hadoop/blob/db947fb8/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeCodeLoader.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeCodeLoader.java \ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeCodeLoader.java index 533fc07..ff5803c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeCodeLoader.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeCodeLoader.java @@ -80,6 +80,11 @@ public class NativeCodeLoader { public static native boolean buildSupportsSnappy(); + * Returns true only if this build was compiled with support for ZStandard. + */ + public static native boolean buildSupportsZstd(); + /** * Returns true only if this build was compiled with support for openssl. public static native boolean buildSupportsOpenssl(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/db947fb8/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeLibraryChecker.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeLibraryChecker.java \ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeLibraryChecker.java index d8c6899..c3ffe58 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeLibraryChecker.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeLibraryChecker.java @@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.crypto.OpensslCipher; import org.apache.hadoop.io.compress.Lz4Codec; import org.apache.hadoop.io.compress.SnappyCodec; +import org.apache.hadoop.io.compress.ZStandardCodec; import org.apache.hadoop.io.compress.bzip2.Bzip2Factory; import org.apache.hadoop.io.compress.zlib.ZlibFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -65,6 +66,7 @@ public class NativeLibraryChecker { boolean nativeHadoopLoaded = NativeCodeLoader.isNativeCodeLoaded(); boolean zlibLoaded = false; boolean snappyLoaded = false; + boolean zStdLoaded = false; // lz4 is linked within libhadoop boolean lz4Loaded = nativeHadoopLoaded; boolean bzip2Loaded = Bzip2Factory.isNativeBzip2Loaded(conf); @@ -75,6 +77,7 @@ public class NativeLibraryChecker { String hadoopLibraryName = ""; String zlibLibraryName = ""; String snappyLibraryName = ""; + String zstdLibraryName = ""; String lz4LibraryName = ""; String bzip2LibraryName = ""; String winutilsPath = null; @@ -90,6 +93,11 @@ public class NativeLibraryChecker { if (snappyLoaded && NativeCodeLoader.buildSupportsSnappy()) { snappyLibraryName = SnappyCodec.getLibraryName(); + zStdLoaded = NativeCodeLoader.buildSupportsZstd() && + ZStandardCodec.isNativeCodeLoaded(); + if (zStdLoaded && NativeCodeLoader.buildSupportsZstd()) { + zstdLibraryName = ZStandardCodec.getLibraryName(); + } if (OpensslCipher.getLoadingFailureReason() != null) { openSslDetail = OpensslCipher.getLoadingFailureReason(); openSslLoaded = false; @@ -122,6 +130,7 @@ public class NativeLibraryChecker { System.out.printf("hadoop: %b %s%n", nativeHadoopLoaded, hadoopLibraryName); System.out.printf("zlib: %b %s%n", zlibLoaded, zlibLibraryName); System.out.printf("snappy: %b %s%n", snappyLoaded, snappyLibraryName); + System.out.printf("zstd : %b %s%n", zStdLoaded, zstdLibraryName); System.out.printf("lz4: %b %s%n", lz4Loaded, lz4LibraryName); System.out.printf("bzip2: %b %s%n", bzip2Loaded, bzip2LibraryName); System.out.printf("openssl: %b %s%n", openSslLoaded, openSslDetail); @@ -130,7 +139,8 @@ public class NativeLibraryChecker { if ((!nativeHadoopLoaded) || (Shell.WINDOWS && (!winutilsExists)) || - (checkAll && !(zlibLoaded && snappyLoaded && lz4Loaded && bzip2Loaded))) { + (checkAll && !(zlibLoaded && snappyLoaded && lz4Loaded && bzip2Loaded + && zStdLoaded))) { // return 1 to indicated check failed ExitUtil.terminate(1); http://git-wip-us.apache.org/repos/asf/hadoop/blob/db947fb8/hadoop-common-project/hado \ op-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.c ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.c \ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.c new file mode 100644 index 0000000..04f2a3e --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.c @@ -0,0 +1,259 @@ + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +#include "org_apache_hadoop_io_compress_zstd.h" +#if defined HADOOP_ZSTD_LIBRARY +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#ifdef UNIX +#include <dlfcn.h> +#include "config.h" +#endif +#include "org_apache_hadoop_io_compress_zstd_ZStandardCompressor.h" +static jfieldID ZStandardCompressor_stream; +static jfieldID ZStandardCompressor_uncompressedDirectBufOff; +static jfieldID ZStandardCompressor_uncompressedDirectBufLen; +static jfieldID ZStandardCompressor_directBufferSize; +static jfieldID ZStandardCompressor_finish; +static jfieldID ZStandardCompressor_finished; +static jfieldID ZStandardCompressor_bytesWritten; +static jfieldID ZStandardCompressor_bytesRead; +#ifdef UNIX +static size_t (*dlsym_ZSTD_CStreamInSize)(void); +static size_t (*dlsym_ZSTD_CStreamOutSize)(void); +static ZSTD_CStream* (*dlsym_ZSTD_createCStream)(void); +static size_t (*dlsym_ZSTD_initCStream)(ZSTD_CStream*, int); +static size_t (*dlsym_ZSTD_freeCStream)(ZSTD_CStream*); +static size_t (*dlsym_ZSTD_compressStream)(ZSTD_CStream*, ZSTD_outBuffer*, \ ZSTD_inBuffer*); +static size_t (*dlsym_ZSTD_endStream)(ZSTD_CStream*, \ ZSTD_outBuffer*); +static size_t (*dlsym_ZSTD_flushStream)(ZSTD_CStream*, \ ZSTD_outBuffer*); +static unsigned (*dlsym_ZSTD_isError)(size_t); +static const char * (*dlsym_ZSTD_getErrorName)(size_t); +#endif +#ifdef WINDOWS +typedef size_t (__cdecl *__dlsym_ZSTD_CStreamInSize)(void); +typedef size_t (__cdecl *__dlsym_ZSTD_CStreamOutSize)(void); +typedef ZSTD_CStream* (__cdecl *__dlsym_ZSTD_createCStream)(void); +typedef size_t (__cdecl *__dlsym_ZSTD_initCStream)(ZSTD_CStream*, int); +typedef size_t (__cdecl *__dlsym_ZSTD_freeCStream)(ZSTD_CStream*); +typedef size_t (__cdecl *__dlsym_ZSTD_compressStream)(ZSTD_CStream*, \ ZSTD_outBuffer*, ZSTD_inBuffer*); +typedef size_t (__cdecl \ *__dlsym_ZSTD_endStream)(ZSTD_CStream*, ZSTD_outBuffer*); +typedef size_t (__cdecl \ *__dlsym_ZSTD_flushStream)(ZSTD_CStream*, ZSTD_outBuffer*); +typedef unsigned \ (__cdecl *__dlsym_ZSTD_isError)(size_t); +typedef const char * (__cdecl \ *__dlsym_ZSTD_getErrorName)(size_t); + +static __dlsym_ZSTD_CStreamInSize dlsym_ZSTD_CStreamInSize; +static __dlsym_ZSTD_CStreamOutSize dlsym_ZSTD_CStreamOutSize; +static __dlsym_ZSTD_createCStream dlsym_ZSTD_createCStream; +static __dlsym_ZSTD_initCStream dlsym_ZSTD_initCStream; +static __dlsym_ZSTD_freeCStream dlsym_ZSTD_freeCStream; +static __dlsym_ZSTD_compressStream dlsym_ZSTD_compressStream; +static __dlsym_ZSTD_endStream dlsym_ZSTD_endStream; +static __dlsym_ZSTD_flushStream dlsym_ZSTD_flushStream; +static __dlsym_ZSTD_isError dlsym_ZSTD_isError; +static __dlsym_ZSTD_getErrorName dlsym_ZSTD_getErrorName; +#endif +// Load the libztsd.so from disk +JNIEXPORT void JNICALL \ Java_org_apache_hadoop_io_compress_zstd_ZStandardCompressor_initIDs (JNIEnv *env, \ jclass clazz) { +#ifdef UNIX + // Load libzstd.so + void *libzstd = dlopen(HADOOP_ZSTD_LIBRARY, RTLD_LAZY | RTLD_GLOBAL); + if (!libzstd) { + char* msg = (char*)malloc(10000); + snprintf(msg, 10000, "%s (%s)!", "Cannot load " HADOOP_ZSTD_LIBRARY, \ dlerror()); + THROW(env, "java/lang/InternalError", msg); + return; + } +#endif +#ifdef WINDOWS + HMODULE libzstd = LoadLibrary(HADOOP_ZSTD_LIBRARY); + if (!libzstd) { + THROW(env, "java/lang/UnsatisfiedLinkError", "Cannot load zstd.dll"); + return; + } +#endif +#ifdef UNIX + // load dynamic symbols + dlerror(); + LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_CStreamInSize, env, libzstd, \ "ZSTD_CStreamInSize"); + LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_CStreamOutSize, env, \ libzstd, "ZSTD_CStreamOutSize"); + LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_createCStream, \ env, libzstd, "ZSTD_createCStream"); + LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_initCStream, \ env, libzstd, "ZSTD_initCStream"); + LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_freeCStream, \ env, libzstd, "ZSTD_freeCStream"); + \ LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_compressStream, env, libzstd, "ZSTD_compressStream"); \ + LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_endStream, env, libzstd, "ZSTD_endStream"); + \ LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_flushStream, env, libzstd, "ZSTD_flushStream"); + \ LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_isError, env, libzstd, "ZSTD_isError"); + \ LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_getErrorName, env, libzstd, "ZSTD_getErrorName"); \ +#endif + +#ifdef WINDOWS + LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_CStreamInSize, dlsym_ZSTD_CStreamInSize, env, \ libzstd, "ZSTD_CStreamInSize"); + LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_CStreamOutSize, \ dlsym_ZSTD_CStreamOutSize, env, libzstd, "ZSTD_CStreamOutSize"); + \ LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_createCStream, dlsym_ZSTD_createCStream, env, \ libzstd, "ZSTD_createCStream"); + LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_initCStream, \ dlsym_ZSTD_initCStream, env, libzstd, "ZSTD_initCStream"); + \ LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_freeCStream, dlsym_ZSTD_freeCStream, env, libzstd, \ "ZSTD_freeCStream"); + LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_compressStream, \ dlsym_ZSTD_compressStream, env, libzstd, "ZSTD_compressStream"); + \ LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_endStream, dlsym_ZSTD_endStream, env, libzstd, \ "ZSTD_endStream"); + LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_flushStream, \ dlsym_ZSTD_flushStream, env, libzstd, "ZSTD_flushStream"); + \ LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_isError, dlsym_ZSTD_isError, env, libzstd, \ "ZSTD_isError"); + LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_getErrorName, \ dlsym_ZSTD_getErrorName, env, libzstd, "ZSTD_getErrorName"); +#endif + // load fields + ZStandardCompressor_stream = (*env)->GetFieldID(env, clazz, "stream", "J"); + ZStandardCompressor_finish = (*env)->GetFieldID(env, clazz, "finish", "Z"); + ZStandardCompressor_finished = (*env)->GetFieldID(env, clazz, "finished", "Z"); + ZStandardCompressor_uncompressedDirectBufOff = (*env)->GetFieldID(env, clazz, \ "uncompressedDirectBufOff", "I"); + ZStandardCompressor_uncompressedDirectBufLen = \ (*env)->GetFieldID(env, clazz, "uncompressedDirectBufLen", "I"); + \ ZStandardCompressor_directBufferSize = (*env)->GetFieldID(env, clazz, \ "directBufferSize", "I"); + ZStandardCompressor_bytesRead = \ (*env)->GetFieldID(env, clazz, "bytesRead", "J"); + \ ZStandardCompressor_bytesWritten = (*env)->GetFieldID(env, clazz, "bytesWritten", \ "J"); +} +// Create the compression stream +JNIEXPORT jlong JNICALL \ Java_org_apache_hadoop_io_compress_zstd_ZStandardCompressor_create (JNIEnv *env, \ jobject this) { + ZSTD_CStream* const stream = dlsym_ZSTD_createCStream(); + if (stream == NULL) { + THROW(env, "java/lang/InternalError", "Error creating the stream"); + return (jlong)0; + } + return (jlong) stream; +// Initialize the compression stream +JNIEXPORT void JNICALL \ Java_org_apache_hadoop_io_compress_zstd_ZStandardCompressor_init (JNIEnv *env, \ jobject this, jint level, jlong stream) { + size_t result = \ dlsym_ZSTD_initCStream((ZSTD_CStream *) stream, level); + if \ (dlsym_ZSTD_isError(result)) { + THROW(env, "java/lang/InternalError", \ dlsym_ZSTD_getErrorName(result)); + return; + } +// free the compression stream +JNIEXPORT void JNICALL \ Java_org_apache_hadoop_io_compress_zstd_ZStandardCompressor_end (JNIEnv *env, jobject \ this, jlong stream) { + size_t result = dlsym_ZSTD_freeCStream((ZSTD_CStream *) \ stream); + if (dlsym_ZSTD_isError(result)) { + THROW(env, "java/lang/InternalError", dlsym_ZSTD_getErrorName(result)); + return; + } +JNIEXPORT jint Java_org_apache_hadoop_io_compress_zstd_ZStandardCompressor_deflateBytesDirect +(JNIEnv *env, jobject this, jobject uncompressed_direct_buf, jint \ uncompressed_direct_buf_off, jint uncompressed_direct_buf_len, jobject \ compressed_direct_buf, jint compressed_direct_buf_len ) { + ZSTD_CStream* const \ stream = (ZSTD_CStream*) (*env)->GetLongField(env, this, ZStandardCompressor_stream); \ + if (!stream) { + THROW(env, "java/lang/NullPointerException", NULL); + return (jint)0; + } + jlong bytes_read = (*env)->GetLongField(env, this, \ ZStandardCompressor_bytesRead); + jlong bytes_written = (*env)->GetLongField(env, \ this, ZStandardCompressor_bytesWritten); + jboolean finish = \ (*env)->GetBooleanField(env, this, ZStandardCompressor_finish); + + // Get the input direct buffer + void * uncompressed_bytes = (*env)->GetDirectBufferAddress(env, \ uncompressed_direct_buf); + if (!uncompressed_bytes) { + THROW(env, "java/lang/InternalError", "Undefined memory address for \ uncompressedDirectBuf"); + return (jint) 0; + } + // Get the output direct buffer + void * compressed_bytes = (*env)->GetDirectBufferAddress(env, \ compressed_direct_buf); + if (!compressed_bytes) { + THROW(env, "java/lang/InternalError", "Undefined memory address for \ compressedDirectBuf"); + return (jint) 0; + } + ZSTD_inBuffer input = { uncompressed_bytes, uncompressed_direct_buf_len, \ uncompressed_direct_buf_off }; + ZSTD_outBuffer output = { compressed_bytes, \ compressed_direct_buf_len, 0 }; + + size_t size = dlsym_ZSTD_compressStream(stream, &output, &input); + if (dlsym_ZSTD_isError(size)) { + THROW(env, "java/lang/InternalError", dlsym_ZSTD_getErrorName(size)); + return (jint) 0; + } + if (finish && input.pos == input.size) { + // end the stream, flush and write the frame epilogue + size = dlsym_ZSTD_endStream(stream, &output); + if (!size) { + (*env)->SetBooleanField(env, this, ZStandardCompressor_finished, \ JNI_TRUE); + } + } else { + // need to flush the output buffer + // this also updates the output buffer position. + size = dlsym_ZSTD_flushStream(stream, &output); + } + if (dlsym_ZSTD_isError(size)) { + THROW(env, "java/lang/InternalError", dlsym_ZSTD_getErrorName(size)); + return (jint) 0; + } + bytes_read += input.pos; + bytes_written += output.pos; + (*env)->SetLongField(env, this, ZStandardCompressor_bytesRead, bytes_read); + (*env)->SetLongField(env, this, ZStandardCompressor_bytesWritten, \ bytes_written); + + (*env)->SetIntField(env, this, ZStandardCompressor_uncompressedDirectBufOff, \ input.pos); + (*env)->SetIntField(env, this, \ ZStandardCompressor_uncompressedDirectBufLen, input.size - input.pos); + return \ (jint) output.pos; +} +JNIEXPORT jstring JNICALL \ Java_org_apache_hadoop_io_compress_zstd_ZStandardCompressor_getLibraryName +(JNIEnv \ *env, jclass class) { +#ifdef UNIX + if (dlsym_ZSTD_isError) { + Dl_info dl_info; + if (dladdr( dlsym_ZSTD_isError, &dl_info)) { + return (*env)->NewStringUTF(env, dl_info.dli_fname); + } + } + return (*env)->NewStringUTF(env, HADOOP_ZSTD_LIBRARY); +#endif +#ifdef WINDOWS + LPWSTR filename = NULL; + GetLibraryName(dlsym_ZSTD_isError, &filename); + if (filename != NULL) { + return (*env)->NewString(env, filename, (jsize) wcslen(filename)); + } else { + return (*env)->NewStringUTF(env, "Unavailable"); + } +#endif +// returns the max size of the recommended input and output buffers +JNIEXPORT jint JNICALL \ Java_org_apache_hadoop_io_compress_zstd_ZStandardCompressor_getStreamSize +(JNIEnv \ *env, jobject this) { + int x = (int) dlsym_ZSTD_CStreamInSize(); + int y = (int) dlsym_ZSTD_CStreamOutSize(); + return (x >= y) ? x : y; +#endif //define HADOOP_ZSTD_LIBRARY \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/db947fb8/hadoop-common-project/hado \ op-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.c ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.c \ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.c new file mode 100644 index 0000000..1236756 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.c @@ -0,0 +1,218 @@ + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +#include "org_apache_hadoop_io_compress_zstd.h" +#if defined HADOOP_ZSTD_LIBRARY +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#ifdef UNIX +#include <dlfcn.h> +#include "config.h" +#endif +#include "org_apache_hadoop_io_compress_zstd_ZStandardDecompressor.h" +static jfieldID ZStandardDecompressor_stream; +static jfieldID ZStandardDecompressor_compressedDirectBufOff; +static jfieldID ZStandardDecompressor_bytesInCompressedBuffer; +static jfieldID ZStandardDecompressor_directBufferSize; +static jfieldID ZStandardDecompressor_finished; +static jfieldID ZStandardDecompressor_remaining; +#ifdef UNIX +static size_t (*dlsym_ZSTD_DStreamOutSize)(void); +static size_t (*dlsym_ZSTD_DStreamInSize)(void); +static ZSTD_DStream* (*dlsym_ZSTD_createDStream)(void); +static size_t (*dlsym_ZSTD_initDStream)(ZSTD_DStream*); +static size_t (*dlsym_ZSTD_freeDStream)(ZSTD_DStream*); +static size_t (*dlsym_ZSTD_resetDStream)(ZSTD_DStream*); +static size_t (*dlsym_ZSTD_decompressStream)(ZSTD_DStream*, ZSTD_outBuffer*, \ ZSTD_inBuffer*); +static size_t (*dlsym_ZSTD_flushStream)(ZSTD_CStream*, \ ZSTD_outBuffer*); +static unsigned (*dlsym_ZSTD_isError)(size_t); +static const char * (*dlsym_ZSTD_getErrorName)(size_t); +#endif +#ifdef WINDOWS +typedef size_t (__cdecl *__dlsym_ZSTD_DStreamOutSize)(void); +typedef size_t (__cdecl *__dlsym_ZSTD_DStreamInSize)(void); +typedef ZSTD_DStream* (__cdecl *__dlsym_ZSTD_createDStream)(void); +typedef size_t (__cdecl *__dlsym_ZSTD_initDStream)(ZSTD_DStream*); +typedef size_t (__cdecl *__dlsym_ZSTD_freeDStream)(ZSTD_DStream*); +typedef size_t (__cdecl *__dlsym_ZSTD_resetDStream)(ZSTD_DStream*); +typedef size_t (__cdecl *__dlsym_ZSTD_decompressStream)(ZSTD_DStream*, \ ZSTD_outBuffer*, ZSTD_inBuffer*); +typedef size_t (__cdecl \ *__dlsym_ZSTD_flushStream)(ZSTD_CStream*, ZSTD_outBuffer*); +typedef unsigned \ (__cdecl *__dlsym_ZSTD_isError)(size_t); +typedef const char * (__cdecl \ *__dlsym_ZSTD_getErrorName)(size_t); + +static __dlsym_ZSTD_DStreamOutSize dlsym_ZSTD_DStreamOutSize; +static __dlsym_ZSTD_DStreamInSize dlsym_ZSTD_DStreamInSize; +static __dlsym_ZSTD_createDStream dlsym_ZSTD_createDStream; +static __dlsym_ZSTD_initDStream dlsym_ZSTD_initDStream; +static __dlsym_ZSTD_freeDStream dlsym_ZSTD_freeDStream; +static __dlsym_ZSTD_resetDStream dlsym_ZSTD_resetDStream; +static __dlsym_ZSTD_decompressStream dlsym_ZSTD_decompressStream; +static __dlsym_ZSTD_isError dlsym_ZSTD_isError; +static __dlsym_ZSTD_getErrorName dlsym_ZSTD_getErrorName; +static __dlsym_ZSTD_flushStream dlsym_ZSTD_flushStream; +#endif +JNIEXPORT void JNICALL \ Java_org_apache_hadoop_io_compress_zstd_ZStandardDecompressor_initIDs (JNIEnv *env, \ jclass clazz) { + // Load libzstd.so +#ifdef UNIX + void *libzstd = dlopen(HADOOP_ZSTD_LIBRARY, RTLD_LAZY | RTLD_GLOBAL); + if (!libzstd) { + char* msg = (char*)malloc(1000); + snprintf(msg, 1000, "%s (%s)!", "Cannot load " HADOOP_ZSTD_LIBRARY, \ dlerror()); + THROW(env, "java/lang/UnsatisfiedLinkError", msg); + return; + } +#endif +#ifdef WINDOWS + HMODULE libzstd = LoadLibrary(HADOOP_ZSTD_LIBRARY); + if (!libzstd) { + THROW(env, "java/lang/UnsatisfiedLinkError", "Cannot load zstd.dll"); + return; + } +#endif +#ifdef UNIX + dlerror(); + LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_DStreamOutSize, env, libzstd, \ "ZSTD_DStreamOutSize"); + LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_DStreamInSize, env, \ libzstd, "ZSTD_DStreamInSize"); + LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_createDStream, \ env, libzstd, "ZSTD_createDStream"); + LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_initDStream, \ env, libzstd, "ZSTD_initDStream"); + LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_freeDStream, \ env, libzstd, "ZSTD_freeDStream"); + LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_resetDStream, \ env, libzstd, "ZSTD_resetDStream"); + \ LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_decompressStream, env, libzstd, \ "ZSTD_decompressStream"); + LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_isError, env, libzstd, \ "ZSTD_isError"); + LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_getErrorName, env, libzstd, \ "ZSTD_getErrorName"); + LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_flushStream, env, libzstd, \ "ZSTD_flushStream"); +#endif +#ifdef WINDOWS + LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_DStreamOutSize, dlsym_ZSTD_DStreamOutSize, env, \ libzstd, "ZSTD_DStreamOutSize"); + LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_DStreamInSize, \ dlsym_ZSTD_DStreamInSize, env, libzstd, "ZSTD_DStreamInSize"); + \ LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_createDStream, dlsym_ZSTD_createDStream, env, \ libzstd, "ZSTD_createDStream"); + LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_initDStream, \ dlsym_ZSTD_initDStream, env, libzstd, "ZSTD_initDStream"); + \ LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_freeDStream, dlsym_ZSTD_freeDStream, env, libzstd, \ "ZSTD_freeDStream"); + LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_resetDStream, \ dlsym_ZSTD_resetDStream, env, libzstd, "ZSTD_resetDStream"); + \ LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_decompressStream, dlsym_ZSTD_decompressStream, env, \ libzstd, "ZSTD_decompressStream"); + LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_isError, \ dlsym_ZSTD_isError, env, libzstd, "ZSTD_isError"); + \ LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_getErrorName, dlsym_ZSTD_getErrorName, env, libzstd, \ "ZSTD_getErrorName"); + LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_flushStream, \ dlsym_ZSTD_flushStream, env, libzstd, "ZSTD_flushStream"); +#endif + ZStandardDecompressor_stream = (*env)->GetFieldID(env, clazz, "stream", "J"); + ZStandardDecompressor_finished = (*env)->GetFieldID(env, clazz, "finished", \ "Z"); + ZStandardDecompressor_compressedDirectBufOff = (*env)->GetFieldID(env, \ clazz, "compressedDirectBufOff", "I"); + \ ZStandardDecompressor_bytesInCompressedBuffer = (*env)->GetFieldID(env, clazz, \ "bytesInCompressedBuffer", "I"); + ZStandardDecompressor_directBufferSize = \ (*env)->GetFieldID(env, clazz, "directBufferSize", "I"); + \ ZStandardDecompressor_remaining = (*env)->GetFieldID(env, clazz, "remaining", "I"); \ +JNIEXPORT jlong JNICALL \ Java_org_apache_hadoop_io_compress_zstd_ZStandardDecompressor_create(JNIEnv *env, \ jobject this) { + ZSTD_DStream * stream = dlsym_ZSTD_createDStream(); + if (stream == NULL) { + THROW(env, "java/lang/InternalError", "Error creating stream"); + return (jlong) 0; + } + return (jlong) stream; +JNIEXPORT void JNICALL \ Java_org_apache_hadoop_io_compress_zstd_ZStandardDecompressor_init(JNIEnv *env, \ jobject this, jlong stream) { + size_t result = \ dlsym_ZSTD_initDStream((ZSTD_DStream *) stream); + if (dlsym_ZSTD_isError(result)) \ { + THROW(env, "java/lang/InternalError", dlsym_ZSTD_getErrorName(result)); + return; + } + (*env)->SetLongField(env, this, ZStandardDecompressor_remaining, 0); +JNIEXPORT void JNICALL \ Java_org_apache_hadoop_io_compress_zstd_ZStandardDecompressor_free(JNIEnv *env, \ jclass obj, jlong stream) { + size_t result = dlsym_ZSTD_freeDStream((ZSTD_DStream \ *) stream); + if (dlsym_ZSTD_isError(result)) { + THROW(env, "java/lang/InternalError", dlsym_ZSTD_getErrorName(result)); + return; + } +JNIEXPORT jint JNICALL \ Java_org_apache_hadoop_io_compress_zstd_ZStandardDecompressor_inflateBytesDirect \ +(JNIEnv *env, jobject this, jobject compressed_direct_buf, jint \ compressed_direct_buf_off, jint compressed_direct_buf_len, jobject \ uncompressed_direct_buf, jint uncompressed_direct_buf_off, jint \ uncompressed_direct_buf_len) { + ZSTD_DStream *stream = (ZSTD_DStream *) \ (*env)->GetLongField(env, this, ZStandardDecompressor_stream); + if (!stream) { + THROW(env, "java/lang/NullPointerException", NULL); + return (jint)0; + } + // Get the input direct buffer + void * compressed_bytes = (*env)->GetDirectBufferAddress(env, \ compressed_direct_buf); + if (!compressed_bytes) { + THROW(env, "java/lang/InternalError", "Undefined memory address for \ compressedDirectBuf"); + return (jint) 0; + } + // Get the output direct buffer + void * uncompressed_bytes = (*env)->GetDirectBufferAddress(env, \ uncompressed_direct_buf); + if (!uncompressed_bytes) { + THROW(env, "java/lang/InternalError", "Undefined memory address for \ uncompressedDirectBuf"); + return (jint) 0; + } + uncompressed_bytes = ((char*) uncompressed_bytes) + uncompressed_direct_buf_off; + ZSTD_inBuffer input = { compressed_bytes, compressed_direct_buf_len, \ compressed_direct_buf_off }; + ZSTD_outBuffer output = { uncompressed_bytes, \ uncompressed_direct_buf_len, 0 }; + + size_t const size = dlsym_ZSTD_decompressStream(stream, &output, &input); + // check for errors + if (dlsym_ZSTD_isError(size)) { + THROW(env, "java/lang/InternalError", dlsym_ZSTD_getErrorName(size)); + return (jint) 0; + } + int remaining = input.size - input.pos; + (*env)->SetIntField(env, this, ZStandardDecompressor_remaining, remaining); + // the entire frame has been decoded + if (size == 0) { + (*env)->SetBooleanField(env, this, ZStandardDecompressor_finished, \ JNI_TRUE); + size_t result = dlsym_ZSTD_initDStream(stream); + if (dlsym_ZSTD_isError(result)) { + THROW(env, "java/lang/InternalError", dlsym_ZSTD_getErrorName(result)); + return (jint) 0; + } + } + (*env)->SetIntField(env, this, ZStandardDecompressor_compressedDirectBufOff, \ input.pos); + (*env)->SetIntField(env, this, \ ZStandardDecompressor_bytesInCompressedBuffer, input.size); + return (jint) \ output.pos; +} +// returns the max size of the recommended input and output buffers +JNIEXPORT jint JNICALL \ Java_org_apache_hadoop_io_compress_zstd_ZStandardDecompressor_getStreamSize +(JNIEnv \ *env, jclass obj) { + int x = (int) dlsym_ZSTD_DStreamInSize(); + int y = (int) dlsym_ZSTD_DStreamOutSize(); + return (x >= y) ? x : y; +#endif //define HADOOP_ZSTD_LIBRARY \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/db947fb8/hadoop-common-project/hado \ op-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/org_apache_hadoop_io_compress_zstd.h ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/org_apache_hadoop_io_compress_zstd.h \ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/org_apache_hadoop_io_compress_zstd.h new file mode 100644 index 0000000..78fc0a4 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/org_apache_hadoop_io_compress_zstd.h @@ -0,0 +1,34 @@ + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +#ifndef ORG_APACHE_HADOOP_IO_COMPRESS_ZSTD_ZSTD_H +#define ORG_APACHE_HADOOP_IO_COMPRESS_ZSTD_ZSTD_H +#include "org_apache_hadoop.h" +#ifdef UNIX +#include <dlfcn.h> +#endif +#include <jni.h> +#include <zstd.h> +#include <stddef.h> +#endif //ORG_APACHE_HADOOP_IO_COMPRESS_ZSTD_ZSTD_H http://git-wip-us.apache.org/repos/asf/hadoop/blob/db947fb8/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/util/NativeCodeLoader.c ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/util/NativeCodeLoader.c \ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/util/NativeCodeLoader.c index 3625112..704f40c 100644 --- a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/util/NativeCodeLoader.c +++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/util/NativeCodeLoader.c @@ -39,6 +39,16 @@ JNIEXPORT jboolean JNICALL \ Java_org_apache_hadoop_util_NativeCodeLoader_buildSup #endif +JNIEXPORT jboolean JNICALL \ Java_org_apache_hadoop_util_NativeCodeLoader_buildSupportsZstd + (JNIEnv *env, \ jclass clazz) +{ +#ifdef HADOOP_ZSTD_LIBRARY + return JNI_TRUE; +#else + return JNI_FALSE; +#endif JNIEXPORT jboolean JNICALL \ Java_org_apache_hadoop_util_NativeCodeLoader_buildSupportsOpenssl (JNIEnv *env, \ jclass clazz) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/db947fb8/hadoop-common-project/hado \ op-common/src/main/resources/META-INF/services/org.apache.hadoop.io.compress.CompressionCodec ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/resources/META-INF/services/org.apache.hadoop.io.compress.CompressionCodec \ b/hadoop-common-project/hadoop-common/src/main/resources/META-INF/services/org.apache.hadoop.io.compress.CompressionCodec index df46e32..568972e 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/META-INF/services/org.apache.hadoop.io.compress.CompressionCodec +++ b/hadoop-common-project/hadoop-common/src/main/resources/META-INF/services/org.apache.hadoop.io.compress.CompressionCodec @@ -17,4 +17,4 @@ org.apache.hadoop.io.compress.DeflateCodec org.apache.hadoop.io.compress.GzipCodec org.apache.hadoop.io.compress.Lz4Codec org.apache.hadoop.io.compress.SnappyCodec +org.apache.hadoop.io.compress.ZStandardCodec http://git-wip-us.apache.org/repos/asf/hadoop/blob/db947fb8/hadoop-common-project/hadoop-common/src/site/markdown/NativeLibraries.md.vm ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/NativeLibraries.md.vm \ b/hadoop-common-project/hadoop-common/src/site/markdown/NativeLibraries.md.vm index \ 04ff426..e4f720c 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/NativeLibraries.md.vm +++ b/hadoop-common-project/hadoop-common/src/site/markdown/NativeLibraries.md.vm @@ -118,6 +118,7 @@ NativeLibraryChecker is a tool to check whether native libraries \ are loaded corr hadoop: true /home/ozawa/hadoop/lib/native/libhadoop.so.1.0.0 zlib: true /lib/x86_64-linux-gnu/libz.so.1 snappy: true /usr/lib/libsnappy.so.1 + zstd: true /usr/lib/libzstd.so.1 lz4: true revision:99 bzip2: false http://git-wip-us.apache.org/repos/asf/hadoop/blob/db947fb8/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java \ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java index 1029517..4bb79a7 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java @@ -75,6 +75,7 @@ import org.junit.Assert; import org.junit.Assume; import org.junit.Test; import static org.junit.Assert.*; +import static org.junit.Assume.assumeTrue; public class TestCodec { @@ -519,6 +520,18 @@ public class TestCodec { + @Test(timeout=20000) + public void testSequenceFileZStandardCodec() throws Exception { + assumeTrue(ZStandardCodec.isNativeCodeLoaded()); + Configuration conf = new Configuration(); + sequenceFileCodecTest(conf, 0, + "org.apache.hadoop.io.compress.ZStandardCodec", 100); + sequenceFileCodecTest(conf, 100, + "org.apache.hadoop.io.compress.ZStandardCodec", 100); + sequenceFileCodecTest(conf, 200000, + "org.apache.hadoop.io.compress.ZStandardCodec", 1000000); @Test public void testSequenceFileDeflateCodec() throws IOException, \ ClassNotFoundException, InstantiationException, IllegalAccessException { @@ -581,7 +594,7 @@ public class TestCodec { @Test public void testSnappyMapFile() throws Exception { - Assume.assumeTrue(SnappyCodec.isNativeCodeLoaded()); + assumeTrue(SnappyCodec.isNativeCodeLoaded()); codecTestMapFile(SnappyCodec.class, CompressionType.BLOCK, 100); http://git-wip-us.apache.org/repos/asf/hadoop/blob/db947fb8/hadoop-common-project/hado \ op-common/src/test/java/org/apache/hadoop/io/compress/TestCompressionStreamReuse.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCompressionStreamReuse.java \ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCompressionStreamReuse.java index 2d75a2d..7b55cac 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCompressionStreamReuse.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCompressionStreamReuse.java @@ -37,6 +37,7 @@ import org.apache.hadoop.util.ReflectionUtils; import org.junit.Test; import static org.junit.Assert.assertTrue; +import static org.junit.Assume.assumeTrue; public class TestCompressionStreamReuse { private static final Log LOG = LogFactory @@ -69,6 +70,13 @@ public class TestCompressionStreamReuse { "org.apache.hadoop.io.compress.GzipCodec"); + @Test + public void testZStandardCompressStreamReuse() throws IOException { + assumeTrue(ZStandardCodec.isNativeCodeLoaded()); + resetStateTest(conf, seed, count, + "org.apache.hadoop.io.compress.ZStandardCodec"); private void resetStateTest(Configuration conf, int seed, int count, String codecClass) throws IOException { // Create the codec --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected] [ prev in list ] [ next in list ] [ prev in thread ] [ next in thread ] Configure | About |