diff --git a/BUILDING.txt b/BUILDING.txt index 5d4ecbda36110..10912934caf49 100644 --- a/BUILDING.txt +++ b/BUILDING.txt @@ -107,8 +107,6 @@ Optional packages: $ sudo apt-get install bzip2 libbz2-dev * Linux FUSE $ sudo apt-get install fuse libfuse-dev -* ZStandard compression - $ sudo apt-get install libzstd1-dev * PMDK library for storage class memory(SCM) as HDFS cache backend Please refer to http://pmem.io/ and https://github.com/pmem/pmdk @@ -196,29 +194,6 @@ Maven build goals: and it ignores the -Dsnappy.prefix option. If -Dsnappy.lib isn't given, the bundling and building will fail. - - ZStandard build options: - - ZStandard is a compression library that can be utilized by the native code. - It is currently an optional component, meaning that Hadoop can be built with - or without this dependency. - - * Use -Drequire.zstd to fail the build if libzstd.so is not found. - If this option is not specified and the zstd library is missing. - - * Use -Dzstd.prefix to specify a nonstandard location for the libzstd - header files and library files. You do not need this option if you have - installed zstandard using a package manager. - - * Use -Dzstd.lib to specify a nonstandard location for the libzstd library - files. Similarly to zstd.prefix, you do not need this option if you have - installed using a package manager. - - * Use -Dbundle.zstd to copy the contents of the zstd.lib directory into - the final tar file. This option requires that -Dzstd.lib is also given, - and it ignores the -Dzstd.prefix option. If -Dzstd.lib isn't given, the - bundling and building will fail. - OpenSSL build options: OpenSSL includes a crypto library that can be utilized by the native code. @@ -556,10 +531,6 @@ Building on Rocky Linux 8 * Install optional dependencies (snappy-devel). $ sudo dnf --enablerepo=PowerTools install snappy-devel -* Install optional dependencies (libzstd-devel). - $ sudo dnf install https://dl.fedoraproject.org/pub/epel/epel-release-latest-8.noarch.rpm - $ sudo dnf --enablerepo=epel install libzstd-devel - * Install optional dependencies (isa-l). $ sudo dnf --enablerepo=PowerTools install nasm $ git clone https://github.com/intel/isa-l diff --git a/dev-support/docker/Dockerfile_rockylinux_8 b/dev-support/docker/Dockerfile_rockylinux_8 index ca8afec98aafc..d7a87ea7d4eec 100644 --- a/dev-support/docker/Dockerfile_rockylinux_8 +++ b/dev-support/docker/Dockerfile_rockylinux_8 @@ -117,7 +117,6 @@ RUN pkg-resolver/install-maven.sh rockylinux:8 RUN pkg-resolver/install-boost.sh rockylinux:8 RUN pkg-resolver/install-spotbugs.sh rockylinux:8 RUN pkg-resolver/install-protobuf.sh rockylinux:8 -RUN pkg-resolver/install-zstandard.sh rockylinux:8 RUN pkg-resolver/install-intel-isa-l.sh rockylinux:8 RUN pkg-resolver/install-common-pkgs.sh diff --git a/dev-support/docker/pkg-resolver/install-zstandard.sh b/dev-support/docker/pkg-resolver/install-zstandard.sh deleted file mode 100644 index 3aafd469d2be3..0000000000000 --- a/dev-support/docker/pkg-resolver/install-zstandard.sh +++ /dev/null @@ -1,53 +0,0 @@ -#!/usr/bin/env bash - -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -if [ $# -lt 1 ]; then - echo "ERROR: Need at least 1 argument, $# were provided" - exit 1 -fi - -pkg-resolver/check_platform.py "$1" -if [ $? -eq 1 ]; then - echo "ERROR: Unsupported platform $1" - exit 1 -fi - -default_version="1.4.9" -version_to_install=$default_version -if [ -n "$2" ]; then - version_to_install="$2" -fi - -if [ "$version_to_install" != "1.4.9" ]; then - echo "WARN: Don't know how to install version $version_to_install, installing the default version $default_version instead" - version_to_install=$default_version -fi - -if [ "$version_to_install" == "1.4.9" ]; then - # hadolint ignore=DL3003 - mkdir -p /opt/zstd /tmp/zstd && - curl -L -s -S https://github.com/facebook/zstd/archive/refs/tags/v1.4.9.tar.gz -o /tmp/zstd/v1.4.9.tar.gz && - tar xzf /tmp/zstd/v1.4.9.tar.gz --strip-components 1 -C /opt/zstd && - cd /opt/zstd || exit && - make "-j$(nproc)" && - make install && - cd /root || exit -else - echo "ERROR: Don't know how to install version $version_to_install" - exit 1 -fi diff --git a/dev-support/docker/pkg-resolver/packages.json b/dev-support/docker/pkg-resolver/packages.json index fc80db752e776..36d0d56e45909 100644 --- a/dev-support/docker/pkg-resolver/packages.json +++ b/dev-support/docker/pkg-resolver/packages.json @@ -222,19 +222,15 @@ }, "zlib": { "debian:12": [ - "libzstd-dev", "zlib1g-dev" ], "debian:13": [ - "libzstd-dev", "zlib1g-dev" ], "ubuntu:noble": [ - "libzstd-dev", "zlib1g-dev" ], "ubuntu:noble::arch64": [ - "libzstd-dev", "zlib1g-dev" ], "rockylinux:8": [ diff --git a/hadoop-client-modules/hadoop-client-api/pom.xml b/hadoop-client-modules/hadoop-client-api/pom.xml index 5fe92472052fc..cf0d0378af10c 100644 --- a/hadoop-client-modules/hadoop-client-api/pom.xml +++ b/hadoop-client-modules/hadoop-client-api/pom.xml @@ -67,13 +67,17 @@ - + org.xerial.snappy snappy-java + + com.github.luben + zstd-jni + @@ -106,8 +110,9 @@ org.apache.hadoop:* - + org.xerial.snappy:* + com.github.luben:zstd-jni @@ -173,6 +178,9 @@ com/sun/management/**/* com/ibm/security/* com/ibm/security/**/* + + com/github/luben/zstd/* + com/github/luben/zstd/**/* diff --git a/hadoop-client-modules/hadoop-client-check-invariants/pom.xml b/hadoop-client-modules/hadoop-client-check-invariants/pom.xml index 0b085b380a25c..9f0c004e0d1b7 100644 --- a/hadoop-client-modules/hadoop-client-check-invariants/pom.xml +++ b/hadoop-client-modules/hadoop-client-check-invariants/pom.xml @@ -92,8 +92,9 @@ org.glassfish.jersey:* org.bouncycastle:* - + org.xerial.snappy:* + com.github.luben:zstd-jni diff --git a/hadoop-client-modules/hadoop-client-check-test-invariants/pom.xml b/hadoop-client-modules/hadoop-client-check-test-invariants/pom.xml index faf31d1d36515..d1ba9cd0e5311 100644 --- a/hadoop-client-modules/hadoop-client-check-test-invariants/pom.xml +++ b/hadoop-client-modules/hadoop-client-check-test-invariants/pom.xml @@ -98,8 +98,9 @@ com.google.code.findbugs:jsr305 org.bouncycastle:* - + org.xerial.snappy:* + com.github.luben:zstd-jni org.ehcache:* org.glassfish.jersey:* diff --git a/hadoop-client-modules/hadoop-client-minicluster/pom.xml b/hadoop-client-modules/hadoop-client-minicluster/pom.xml index 4ba6f5641f2ee..8e4f971cc6a8c 100644 --- a/hadoop-client-modules/hadoop-client-minicluster/pom.xml +++ b/hadoop-client-modules/hadoop-client-minicluster/pom.xml @@ -46,6 +46,11 @@ snappy-java runtime + + com.github.luben + zstd-jni + runtime + org.apache.hadoop hadoop-client-runtime @@ -707,8 +712,9 @@ org.bouncycastle:* - + org.xerial.snappy:* + com.github.luben:zstd-jni org.glassfish.jersey:* @@ -947,6 +953,9 @@ com/sun/management/**/* com/ibm/security/* com/ibm/security/**/* + + com/github/luben/zstd/* + com/github/luben/zstd/**/* diff --git a/hadoop-client-modules/hadoop-client-runtime/pom.xml b/hadoop-client-modules/hadoop-client-runtime/pom.xml index ec43b30d82d38..d0cf42caf1868 100644 --- a/hadoop-client-modules/hadoop-client-runtime/pom.xml +++ b/hadoop-client-modules/hadoop-client-runtime/pom.xml @@ -66,6 +66,11 @@ snappy-java runtime + + com.github.luben + zstd-jni + runtime + @@ -170,6 +175,7 @@ org.bouncycastle:* org.xerial.snappy:* + com.github.luben:zstd-jni org.jetbrains.kotlin:* org.glassfish.jersey.test-framework:* @@ -320,6 +326,9 @@ com/sun/management/**/* com/ibm/security/* com/ibm/security/**/* + + com/github/luben/zstd/* + com/github/luben/zstd/**/* diff --git a/hadoop-common-project/hadoop-common/pom.xml b/hadoop-common-project/hadoop-common/pom.xml index 492ecc2ffd71f..793fca4186431 100644 --- a/hadoop-common-project/hadoop-common/pom.xml +++ b/hadoop-common-project/hadoop-common/pom.xml @@ -370,6 +370,11 @@ lz4-java provided + + com.github.luben + zstd-jni + compile + org.junit.jupiter junit-jupiter-api @@ -696,10 +701,6 @@ false - - - - false @@ -749,10 +750,6 @@ ${project.build.directory}/native/javah ${sun.arch.data.model} ${require.bzip2} - ${require.zstd} - ${zstd.prefix} - ${zstd.lib} - ${zstd.include} ${require.isal} ${isal.prefix} ${isal.lib} @@ -807,11 +804,6 @@ false - - - - false - true @@ -961,10 +953,6 @@ /nologo /p:Configuration=Release /p:OutDir=${project.build.directory}/bin/ - /p:CustomZstdPrefix=${zstd.prefix} - /p:CustomZstdLib=${zstd.lib} - /p:CustomZstdInclude=${zstd.include} - /p:RequireZstd=${require.zstd} /p:CustomOpensslPrefix=${openssl.prefix} /p:CustomOpensslLib=${openssl.lib} /p:CustomOpensslInclude=${openssl.include} @@ -989,10 +977,6 @@ /nologo /p:Configuration=Release /p:OutDir=${project.build.directory}/bin/ - /p:CustomZstdPrefix=${zstd.prefix} - /p:CustomZstdLib=${zstd.lib} - /p:CustomZstdInclude=${zstd.include} - /p:RequireZstd=${require.zstd} /p:CustomOpensslPrefix=${openssl.prefix} /p:CustomOpensslLib=${openssl.lib} /p:CustomOpensslInclude=${openssl.include} diff --git a/hadoop-common-project/hadoop-common/src/CMakeLists.txt b/hadoop-common-project/hadoop-common/src/CMakeLists.txt index 8bf94acaad0ae..5955e5e7c49ca 100644 --- a/hadoop-common-project/hadoop-common/src/CMakeLists.txt +++ b/hadoop-common-project/hadoop-common/src/CMakeLists.txt @@ -67,33 +67,6 @@ else() endif() set(CMAKE_FIND_LIBRARY_SUFFIXES ${STORED_CMAKE_FIND_LIBRARY_SUFFIXES}) -# Require zstandard -SET(STORED_CMAKE_FIND_LIBRARY_SUFFIXES ${CMAKE_FIND_LIBRARY_SUFFIXES}) -hadoop_set_find_shared_library_version("1") -find_library(ZSTD_LIBRARY - NAMES zstd - PATHS ${CUSTOM_ZSTD_PREFIX} ${CUSTOM_ZSTD_PREFIX}/lib - ${CUSTOM_ZSTD_PREFIX}/lib64 ${CUSTOM_ZSTD_LIB}) -SET(CMAKE_FIND_LIBRARY_SUFFIXES ${STORED_CMAKE_FIND_LIBRARY_SUFFIXES}) -find_path(ZSTD_INCLUDE_DIR - NAMES zstd.h - PATHS ${CUSTOM_ZSTD_PREFIX} ${CUSTOM_ZSTD_PREFIX}/include - ${CUSTOM_ZSTD_INCLUDE}) -if (ZSTD_LIBRARY AND ZSTD_INCLUDE_DIR) - GET_FILENAME_COMPONENT(HADOOP_ZSTD_LIBRARY ${ZSTD_LIBRARY} NAME) - set(ZSTD_SOURCE_FILES - "${SRC}/io/compress/zstd/ZStandardCompressor.c" - "${SRC}/io/compress/zstd/ZStandardDecompressor.c") - set(REQUIRE_ZSTD ${REQUIRE_ZSTD}) # Stop warning about unused variable. - message(STATUS "Found ZStandard: ${ZSTD_LIBRARY}") -else () - set(ZSTD_INCLUDE_DIR "") - set(ZSTD_SOURCE_FILES "") - IF(REQUIRE_ZSTD) - MESSAGE(FATAL_ERROR "Required zstandard library could not be found. ZSTD_LIBRARY=${ZSTD_LIBRARY}, ZSTD_INCLUDE_DIR=${ZSTD_INCLUDE_DIR}, CUSTOM_ZSTD_INCLUDE_DIR=${CUSTOM_ZSTD_INCLUDE_DIR}, CUSTOM_ZSTD_PREFIX=${CUSTOM_ZSTD_PREFIX}, CUSTOM_ZSTD_INCLUDE=${CUSTOM_ZSTD_INCLUDE}") - ENDIF(REQUIRE_ZSTD) -endif () - #Require ISA-L set(STORED_CMAKE_FIND_LIBRARY_SUFFIXES ${CMAKE_FIND_LIBRARY_SUFFIXES}) hadoop_set_find_shared_library_version("2") @@ -229,7 +202,6 @@ include_directories( ${ZLIB_INCLUDE_DIRS} ${BZIP2_INCLUDE_DIR} ${ISAL_INCLUDE_DIR} - ${ZSTD_INCLUDE_DIR} ${OPENSSL_INCLUDE_DIR} ${SRC}/util ) @@ -239,7 +211,6 @@ set(CMAKE_BUILD_WITH_INSTALL_RPATH TRUE) hadoop_add_dual_library(hadoop main/native/src/exception.c ${ISAL_SOURCE_FILES} - ${ZSTD_SOURCE_FILES} ${OPENSSL_SOURCE_FILES} ${SRC}/io/compress/zlib/ZlibCompressor.c ${SRC}/io/compress/zlib/ZlibDecompressor.c diff --git a/hadoop-common-project/hadoop-common/src/config.h.cmake b/hadoop-common-project/hadoop-common/src/config.h.cmake index 7e23a5df3281c..7dea92b0650f8 100644 --- a/hadoop-common-project/hadoop-common/src/config.h.cmake +++ b/hadoop-common-project/hadoop-common/src/config.h.cmake @@ -21,7 +21,6 @@ #cmakedefine HADOOP_ZLIB_LIBRARY "@HADOOP_ZLIB_LIBRARY@" #cmakedefine HADOOP_BZIP2_LIBRARY "@HADOOP_BZIP2_LIBRARY@" #cmakedefine HADOOP_SNAPPY_LIBRARY "@HADOOP_SNAPPY_LIBRARY@" -#cmakedefine HADOOP_ZSTD_LIBRARY "@HADOOP_ZSTD_LIBRARY@" #cmakedefine HADOOP_OPENSSL_LIBRARY "@HADOOP_OPENSSL_LIBRARY@" #cmakedefine HADOOP_ISAL_LIBRARY "@HADOOP_ISAL_LIBRARY@" #cmakedefine HADOOP_PMDK_LIBRARY "@HADOOP_PMDK_LIBRARY@" diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/ZStandardCodec.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/ZStandardCodec.java index 139e81eb73cc2..7b7ad69014caa 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/ZStandardCodec.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/ZStandardCodec.java @@ -23,7 +23,6 @@ import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.io.compress.zstd.ZStandardCompressor; import org.apache.hadoop.io.compress.zstd.ZStandardDecompressor; -import org.apache.hadoop.util.NativeCodeLoader; import java.io.IOException; import java.io.InputStream; @@ -59,30 +58,8 @@ public Configuration getConf() { return conf; } - public static void checkNativeCodeLoaded() { - if (!NativeCodeLoader.isNativeCodeLoaded() || - !NativeCodeLoader.buildSupportsZstd()) { - throw new RuntimeException("native zStandard library " - + "not available: this version of libhadoop was built " - + "without zstd support."); - } - if (!ZStandardCompressor.isNativeCodeLoaded()) { - throw new RuntimeException("native zStandard library not " - + "available: ZStandardCompressor has not been loaded."); - } - if (!ZStandardDecompressor.isNativeCodeLoaded()) { - throw new RuntimeException("native zStandard library not " - + "available: ZStandardDecompressor has not been loaded."); - } - } - - public static boolean isNativeCodeLoaded() { - return ZStandardCompressor.isNativeCodeLoaded() - && ZStandardDecompressor.isNativeCodeLoaded(); - } - public static String getLibraryName() { - return ZStandardCompressor.getLibraryName(); + return "zstd-jni"; } public static int getCompressionLevel(Configuration conf) { @@ -121,8 +98,7 @@ private static int getBufferSize(Configuration conf) { @Override public CompressionOutputStream createOutputStream(OutputStream out) throws IOException { - return Util. - createOutputStreamWithCodecPool(this, conf, out); + return Util.createOutputStreamWithCodecPool(this, conf, out); } /** @@ -138,9 +114,7 @@ public CompressionOutputStream createOutputStream(OutputStream out) public CompressionOutputStream createOutputStream(OutputStream out, Compressor compressor) throws IOException { - checkNativeCodeLoaded(); - return new CompressorStream(out, compressor, - getCompressionBufferSize(conf)); + return new CompressorStream(out, compressor, getCompressionBufferSize(conf)); } /** @@ -150,7 +124,6 @@ public CompressionOutputStream createOutputStream(OutputStream out, */ @Override public Class getCompressorType() { - checkNativeCodeLoaded(); return ZStandardCompressor.class; } @@ -161,7 +134,6 @@ public Class getCompressorType() { */ @Override public Compressor createCompressor() { - checkNativeCodeLoaded(); return new ZStandardCompressor( getCompressionLevel(conf), getCompressionBufferSize(conf)); } @@ -178,8 +150,7 @@ public Compressor createCompressor() { @Override public CompressionInputStream createInputStream(InputStream in) throws IOException { - return Util. - createInputStreamWithCodecPool(this, conf, in); + return Util.createInputStreamWithCodecPool(this, conf, in); } /** @@ -195,7 +166,6 @@ public CompressionInputStream createInputStream(InputStream in) public CompressionInputStream createInputStream(InputStream in, Decompressor decompressor) throws IOException { - checkNativeCodeLoaded(); return new DecompressorStream(in, decompressor, getDecompressionBufferSize(conf)); } @@ -208,7 +178,6 @@ public CompressionInputStream createInputStream(InputStream in, */ @Override public Class getDecompressorType() { - checkNativeCodeLoaded(); return ZStandardDecompressor.class; } @@ -219,7 +188,6 @@ public Class getDecompressorType() { */ @Override public Decompressor createDecompressor() { - checkNativeCodeLoaded(); return new ZStandardDecompressor(getDecompressionBufferSize(conf)); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.java index a77b59640cdaf..3906b668b011f 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.java @@ -18,13 +18,15 @@ package org.apache.hadoop.io.compress.zstd; +import com.github.luben.zstd.EndDirective; +import com.github.luben.zstd.ZstdCompressCtx; +import com.github.luben.zstd.ZstdOutputStream; import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.io.compress.Compressor; import org.apache.hadoop.io.compress.ZStandardCodec; -import org.apache.hadoop.util.NativeCodeLoader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,15 +34,14 @@ import java.nio.ByteBuffer; /** - * A {@link Compressor} based on the zStandard compression algorithm. - * https://github.com/facebook/zstd + * A {@link Compressor} based on the Zstandard compression algorithm, + * backed by the zstd-jni library. */ public class ZStandardCompressor implements Compressor { private static final Logger LOG = LoggerFactory.getLogger(ZStandardCompressor.class); - private long stream; private int level; private int directBufferSize; private byte[] userBuf = null; @@ -53,26 +54,11 @@ public class ZStandardCompressor implements Compressor { private long bytesRead = 0; private long bytesWritten = 0; - private static boolean nativeZStandardLoaded = false; - - static { - if (NativeCodeLoader.isNativeCodeLoaded()) { - try { - // Initialize the native library - initIDs(); - nativeZStandardLoaded = true; - } catch (Throwable t) { - LOG.warn("Error loading zstandard native libraries: " + t); - } - } - } - - public static boolean isNativeCodeLoaded() { - return nativeZStandardLoaded; - } + private ZstdCompressCtx zstdJniCtx = null; public static int getRecommendedBufferSize() { - return getStreamSize(); + // zstd-jni recommended output size for streaming (~128 KB) + return (int) ZstdOutputStream.recommendedCOutSize(); } @VisibleForTesting @@ -94,11 +80,11 @@ public ZStandardCompressor(int level, int bufferSize) { @VisibleForTesting ZStandardCompressor(int level, int inputBufferSize, int outputBufferSize) { this.level = level; - stream = create(); - this.directBufferSize = outputBufferSize; + zstdJniCtx = new ZstdCompressCtx(); uncompressedDirectBuf = ByteBuffer.allocateDirect(inputBufferSize); + directBufferSize = outputBufferSize; compressedDirectBuf = ByteBuffer.allocateDirect(outputBufferSize); - compressedDirectBuf.position(outputBufferSize); + compressedDirectBuf.position(directBufferSize); reset(); } @@ -210,30 +196,49 @@ public int compress(byte[] b, int off, int len) throws IOException { return n; } - // Re-initialize the output direct buffer - compressedDirectBuf.rewind(); - compressedDirectBuf.limit(directBufferSize); + boolean allConsumed = (uncompressedDirectBufLen - uncompressedDirectBufOff <= 0); + // Use END only when finish=true, no more user data, and all direct-buffer + // data consumed (mirrors ZSTD_endStream); otherwise FLUSH (mirrors + // ZSTD_compressStream + ZSTD_flushStream). + boolean shouldEnd = finish && userBufLen == 0 && allConsumed; + if (!allConsumed || shouldEnd) { + // Re-initialize the output direct buffer + compressedDirectBuf.rewind(); + compressedDirectBuf.limit(directBufferSize); + + uncompressedDirectBuf.position(uncompressedDirectBufOff); + uncompressedDirectBuf.limit(uncompressedDirectBufLen); + compressedDirectBuf.position(0); + compressedDirectBuf.limit(directBufferSize); + + EndDirective endOp = shouldEnd ? EndDirective.END : EndDirective.FLUSH; + boolean done = zstdJniCtx.compressDirectByteBufferStream( + compressedDirectBuf, uncompressedDirectBuf, endOp); + + int newOff = uncompressedDirectBuf.position(); + n = compressedDirectBuf.position(); + + bytesRead += newOff - uncompressedDirectBufOff; + bytesWritten += n; + + uncompressedDirectBufOff = newOff; + if (uncompressedDirectBufLen - uncompressedDirectBufOff <= 0) { + keepUncompressedBuf = false; + uncompressedDirectBuf.clear(); + uncompressedDirectBufOff = 0; + uncompressedDirectBufLen = 0; + } else { + keepUncompressedBuf = true; + } - // Compress data - n = deflateBytesDirect( - uncompressedDirectBuf, - uncompressedDirectBufOff, - uncompressedDirectBufLen, - compressedDirectBuf, - directBufferSize - ); - compressedDirectBuf.limit(n); - - // Check if we have consumed all input buffer - if (uncompressedDirectBufLen - uncompressedDirectBufOff <= 0) { - // consumed all input buffer - keepUncompressedBuf = false; - uncompressedDirectBuf.clear(); - uncompressedDirectBufOff = 0; - uncompressedDirectBufLen = 0; + if (endOp == EndDirective.END && done) { + finished = true; + } + + compressedDirectBuf.position(0); + compressedDirectBuf.limit(n); } else { - // did not consume all input buffer - keepUncompressedBuf = true; + n = 0; } // Get at most 'len' bytes @@ -267,7 +272,8 @@ public long getBytesRead() { @Override public void reset() { checkStream(); - init(level, stream); + zstdJniCtx.reset(); + zstdJniCtx.setLevel(level); finish = false; finished = false; bytesRead = 0; @@ -284,24 +290,15 @@ public void reset() { @Override public void end() { - if (stream != 0) { - end(stream); - stream = 0; + if (zstdJniCtx != null) { + zstdJniCtx.close(); + zstdJniCtx = null; } } private void checkStream() { - if (stream == 0) { + if (zstdJniCtx == null) { throw new NullPointerException(); } } - - private native static long create(); - private native static void init(int level, long stream); - private native int deflateBytesDirect(ByteBuffer src, int srcOffset, - int srcLen, ByteBuffer dst, int dstLen); - private native static int getStreamSize(); - private native static void end(long strm); - private native static void initIDs(); - public native static String getLibraryName(); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.java index 792547a62faea..92263691f54e4 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.java @@ -18,9 +18,10 @@ package org.apache.hadoop.io.compress.zstd; +import com.github.luben.zstd.ZstdDecompressCtx; +import com.github.luben.zstd.ZstdInputStream; import org.apache.hadoop.io.compress.Decompressor; import org.apache.hadoop.io.compress.DirectDecompressor; -import org.apache.hadoop.util.NativeCodeLoader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,14 +29,13 @@ import java.nio.ByteBuffer; /** - * A {@link Decompressor} based on the zStandard compression algorithm. - * https://github.com/facebook/zstd + * A {@link Decompressor} based on the Zstandard compression algorithm. + * backed by the zstd-jni library. */ public class ZStandardDecompressor implements Decompressor { private static final Logger LOG = LoggerFactory.getLogger(ZStandardDecompressor.class); - private long stream; private int directBufferSize; private ByteBuffer compressedDirectBuf = null; private int compressedDirectBufOff, bytesInCompressedBuffer; @@ -45,30 +45,16 @@ public class ZStandardDecompressor implements Decompressor { private boolean finished; private int remaining = 0; - private static boolean nativeZStandardLoaded = false; - - static { - if (NativeCodeLoader.isNativeCodeLoaded()) { - try { - // Initialize the native library - initIDs(); - nativeZStandardLoaded = true; - } catch (Throwable t) { - LOG.warn("Error loading zstandard native libraries: " + t); - } - } - } - - public static boolean isNativeCodeLoaded() { - return nativeZStandardLoaded; - } + /** zstd-jni decompression context; non-null only when using zstd-jni backend. */ + private ZstdDecompressCtx zstdJniCtx = null; public static int getRecommendedBufferSize() { - return getStreamSize(); + // zstd-jni recommended input size for streaming (~128 KB) + return (int) ZstdInputStream.recommendedDInSize(); } public ZStandardDecompressor() { - this(getStreamSize()); + this(getRecommendedBufferSize()); } /** @@ -77,10 +63,10 @@ public ZStandardDecompressor() { */ public ZStandardDecompressor(int bufferSize) { this.directBufferSize = bufferSize; + zstdJniCtx = new ZstdDecompressCtx(); compressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize); uncompressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize); uncompressedDirectBuf.position(directBufferSize); - stream = create(); reset(); } @@ -184,22 +170,34 @@ public int decompress(byte[] b, int off, int len) uncompressedDirectBuf.rewind(); uncompressedDirectBuf.limit(directBufferSize); - // Decompress data - n = inflateBytesDirect( - compressedDirectBuf, - compressedDirectBufOff, - bytesInCompressedBuffer, - uncompressedDirectBuf, - 0, - directBufferSize - ); + if (compressedDirectBufOff < bytesInCompressedBuffer) { + compressedDirectBuf.position(compressedDirectBufOff); + compressedDirectBuf.limit(bytesInCompressedBuffer); + uncompressedDirectBuf.position(0); + uncompressedDirectBuf.limit(directBufferSize); + + boolean done = zstdJniCtx.decompressDirectByteBufferStream( + uncompressedDirectBuf, compressedDirectBuf); + + compressedDirectBufOff = compressedDirectBuf.position(); + remaining = bytesInCompressedBuffer - compressedDirectBufOff; + n = uncompressedDirectBuf.position(); + + // Mark finished only when the frame is done AND no more bytes remain + // (compressedDirectBuf may hold additional concatenated frames). + if (done && remaining == 0) { + finished = true; + } else if (remaining > 0 && finished) { + finished = false; + } - // Set the finished to false when compressedDirectBuf still - // contains some bytes. - if (remaining > 0 && finished) { - finished = false; + // Restore limit so setInputFromSavedData() can rewind+put on next call. + compressedDirectBuf.limit(directBufferSize); + } else { + n = 0; } + uncompressedDirectBuf.rewind(); uncompressedDirectBuf.limit(n); // Get at most 'len' bytes @@ -226,7 +224,7 @@ public int getRemaining() { @Override public void reset() { checkStream(); - init(stream); + zstdJniCtx.reset(); remaining = 0; finished = false; compressedDirectBufOff = 0; @@ -239,9 +237,9 @@ public void reset() { @Override public void end() { - if (stream != 0) { - free(stream); - stream = 0; + if (zstdJniCtx != null) { + zstdJniCtx.close(); + zstdJniCtx = null; } } @@ -251,8 +249,8 @@ protected void finalize() { } private void checkStream() { - if (stream == 0) { - throw new NullPointerException("Stream not initialized"); + if (zstdJniCtx == null) { + throw new NullPointerException("Decompression context not initialized"); } } @@ -262,35 +260,24 @@ private int populateUncompressedBuffer(byte[] b, int off, int len, int n) { return n; } - private native static void initIDs(); - private native static long create(); - private native static void init(long stream); - private native int inflateBytesDirect(ByteBuffer src, int srcOffset, - int srcLen, ByteBuffer dst, int dstOffset, int dstLen); - private native static void free(long strm); - private native static int getStreamSize(); - - int inflateDirect(ByteBuffer src, ByteBuffer dst) throws IOException { - assert - (this instanceof ZStandardDecompressor.ZStandardDirectDecompressor); - - int originalPosition = dst.position(); - int n = inflateBytesDirect( - src, src.position(), src.limit(), dst, dst.position(), - dst.limit() - ); - dst.position(originalPosition + n); - if (bytesInCompressedBuffer > 0) { - src.position(compressedDirectBufOff); + int inflateDirect(ByteBuffer src, ByteBuffer dst) { + assert (this instanceof ZStandardDecompressor.ZStandardDirectDecompressor); + + // zstd-jni: use streaming decompression directly on the provided buffers + int origDstPos = dst.position(); + boolean done = zstdJniCtx.decompressDirectByteBufferStream(dst, src); + if (done) { + finished = true; + remaining = 0; } else { - src.position(src.limit()); + remaining = src.limit() - src.position(); } - return n; + return dst.position() - origDstPos; } /** * A {@link DirectDecompressor} for ZStandard - * https://github.com/facebook/zstd. + * Zstandard */ public static class ZStandardDirectDecompressor extends ZStandardDecompressor implements DirectDecompressor { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/package-info.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/package-info.java index 7214bf8582b62..70cadb3093d2f 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/package-info.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/package-info.java @@ -17,10 +17,10 @@ */ /** - * Implementation of compression/decompression based on the zStandard + * Implementation of compression/decompression based on the Zstandard * compression algorithm. * - * @see zStandard + * @see Zstandard */ @InterfaceAudience.Private @InterfaceStability.Unstable diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeCodeLoader.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeCodeLoader.java index b5550f58ae218..042305dcc034e 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeCodeLoader.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeCodeLoader.java @@ -81,13 +81,6 @@ public static boolean isNativeCodeLoaded() { */ public static native boolean buildSupportsIsal(); - /** - * Returns true only if this build was compiled with support for ZStandard. - * - * @return if this build was compiled with support for ZStandard true,not false. - */ - public static native boolean buildSupportsZstd(); - /** * Returns true only if this build was compiled with support for openssl. * diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeLibraryChecker.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeLibraryChecker.java index 9843a9d4057dd..50c4168715383 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeLibraryChecker.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NativeLibraryChecker.java @@ -18,7 +18,6 @@ package org.apache.hadoop.util; -import org.apache.hadoop.io.compress.ZStandardCodec; import org.apache.hadoop.io.erasurecode.ErasureCodeNative; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.crypto.OpensslCipher; @@ -67,7 +66,6 @@ public static void main(String[] args) { boolean nativeHadoopLoaded = NativeCodeLoader.isNativeCodeLoaded(); boolean zlibLoaded = false; boolean isalLoaded = false; - boolean zStdLoaded = false; boolean pmdkLoaded = false; boolean bzip2Loaded = Bzip2Factory.isNativeBzip2Loaded(conf); boolean openSslLoaded = false; @@ -78,7 +76,6 @@ public static void main(String[] args) { String zlibLibraryName = ""; String isalDetail = ""; String pmdkDetail = ""; - String zstdLibraryName = ""; String bzip2LibraryName = ""; String winutilsPath = null; @@ -88,11 +85,6 @@ public static void main(String[] args) { if (zlibLoaded) { zlibLibraryName = ZlibFactory.getLibraryName(); } - zStdLoaded = NativeCodeLoader.buildSupportsZstd() && - ZStandardCodec.isNativeCodeLoaded(); - if (zStdLoaded && NativeCodeLoader.buildSupportsZstd()) { - zstdLibraryName = ZStandardCodec.getLibraryName(); - } isalDetail = ErasureCodeNative.getLoadingFailureReason(); if (isalDetail != null) { @@ -137,7 +129,6 @@ public static void main(String[] args) { System.out.println("Native library checking:"); System.out.printf("hadoop: %b %s%n", nativeHadoopLoaded, hadoopLibraryName); System.out.printf("zlib: %b %s%n", zlibLoaded, zlibLibraryName); - System.out.printf("zstd : %b %s%n", zStdLoaded, zstdLibraryName); System.out.printf("bzip2: %b %s%n", bzip2Loaded, bzip2LibraryName); System.out.printf("openssl: %b %s%n", openSslLoaded, openSslDetail); System.out.printf("ISA-L: %b %s%n", isalLoaded, isalDetail); @@ -148,8 +139,7 @@ public static void main(String[] args) { } if ((!nativeHadoopLoaded) || (Shell.WINDOWS && (!winutilsExists)) || - (checkAll && !(zlibLoaded && bzip2Loaded - && isalLoaded && zStdLoaded))) { + (checkAll && !(zlibLoaded && bzip2Loaded && isalLoaded))) { // return 1 to indicated check failed ExitUtil.terminate(1); } diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.c deleted file mode 100644 index 6581f292b4a00..0000000000000 --- a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.c +++ /dev/null @@ -1,262 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -#include "org_apache_hadoop_io_compress_zstd.h" - -#if defined HADOOP_ZSTD_LIBRARY - -#include -#include -#include - -#ifdef UNIX -#include -#include "config.h" -#endif - -#include "org_apache_hadoop_io_compress_zstd_ZStandardCompressor.h" - -static jfieldID ZStandardCompressor_stream; -static jfieldID ZStandardCompressor_uncompressedDirectBufOff; -static jfieldID ZStandardCompressor_uncompressedDirectBufLen; -static jfieldID ZStandardCompressor_directBufferSize; -static jfieldID ZStandardCompressor_finish; -static jfieldID ZStandardCompressor_finished; -static jfieldID ZStandardCompressor_bytesWritten; -static jfieldID ZStandardCompressor_bytesRead; - -#ifdef UNIX -static size_t (*dlsym_ZSTD_CStreamInSize)(void); -static size_t (*dlsym_ZSTD_CStreamOutSize)(void); -static ZSTD_CStream* (*dlsym_ZSTD_createCStream)(void); -static size_t (*dlsym_ZSTD_initCStream)(ZSTD_CStream*, int); -static size_t (*dlsym_ZSTD_freeCStream)(ZSTD_CStream*); -static size_t (*dlsym_ZSTD_compressStream)(ZSTD_CStream*, ZSTD_outBuffer*, ZSTD_inBuffer*); -static size_t (*dlsym_ZSTD_endStream)(ZSTD_CStream*, ZSTD_outBuffer*); -static size_t (*dlsym_ZSTD_flushStream)(ZSTD_CStream*, ZSTD_outBuffer*); -static unsigned (*dlsym_ZSTD_isError)(size_t); -static const char * (*dlsym_ZSTD_getErrorName)(size_t); -#endif - -#ifdef WINDOWS -typedef size_t (__cdecl *__dlsym_ZSTD_CStreamInSize)(void); -typedef size_t (__cdecl *__dlsym_ZSTD_CStreamOutSize)(void); -typedef ZSTD_CStream* (__cdecl *__dlsym_ZSTD_createCStream)(void); -typedef size_t (__cdecl *__dlsym_ZSTD_initCStream)(ZSTD_CStream*, int); -typedef size_t (__cdecl *__dlsym_ZSTD_freeCStream)(ZSTD_CStream*); -typedef size_t (__cdecl *__dlsym_ZSTD_compressStream)(ZSTD_CStream*, ZSTD_outBuffer*, ZSTD_inBuffer*); -typedef size_t (__cdecl *__dlsym_ZSTD_endStream)(ZSTD_CStream*, ZSTD_outBuffer*); -typedef size_t (__cdecl *__dlsym_ZSTD_flushStream)(ZSTD_CStream*, ZSTD_outBuffer*); -typedef unsigned (__cdecl *__dlsym_ZSTD_isError)(size_t); -typedef const char * (__cdecl *__dlsym_ZSTD_getErrorName)(size_t); - -static __dlsym_ZSTD_CStreamInSize dlsym_ZSTD_CStreamInSize; -static __dlsym_ZSTD_CStreamOutSize dlsym_ZSTD_CStreamOutSize; -static __dlsym_ZSTD_createCStream dlsym_ZSTD_createCStream; -static __dlsym_ZSTD_initCStream dlsym_ZSTD_initCStream; -static __dlsym_ZSTD_freeCStream dlsym_ZSTD_freeCStream; -static __dlsym_ZSTD_compressStream dlsym_ZSTD_compressStream; -static __dlsym_ZSTD_endStream dlsym_ZSTD_endStream; -static __dlsym_ZSTD_flushStream dlsym_ZSTD_flushStream; -static __dlsym_ZSTD_isError dlsym_ZSTD_isError; -static __dlsym_ZSTD_getErrorName dlsym_ZSTD_getErrorName; -#endif - -// Load the libzstd.so from disk -JNIEXPORT void JNICALL Java_org_apache_hadoop_io_compress_zstd_ZStandardCompressor_initIDs (JNIEnv *env, jclass clazz) { -#ifdef UNIX - // Load libzstd.so - void *libzstd = dlopen(HADOOP_ZSTD_LIBRARY, RTLD_LAZY | RTLD_GLOBAL); - if (!libzstd) { - char* msg = (char*)malloc(10000); - snprintf(msg, 10000, "%s (%s)!", "Cannot load " HADOOP_ZSTD_LIBRARY, dlerror()); - THROW(env, "java/lang/InternalError", msg); - return; - } -#endif - -#ifdef WINDOWS - HMODULE libzstd = LoadLibrary(HADOOP_ZSTD_LIBRARY); - if (!libzstd) { - THROW(env, "java/lang/UnsatisfiedLinkError", "Cannot load zstd.dll"); - return; - } -#endif - -#ifdef UNIX - // load dynamic symbols - dlerror(); - LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_CStreamInSize, env, libzstd, "ZSTD_CStreamInSize"); - LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_CStreamOutSize, env, libzstd, "ZSTD_CStreamOutSize"); - LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_createCStream, env, libzstd, "ZSTD_createCStream"); - LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_initCStream, env, libzstd, "ZSTD_initCStream"); - LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_freeCStream, env, libzstd, "ZSTD_freeCStream"); - LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_compressStream, env, libzstd, "ZSTD_compressStream"); - LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_endStream, env, libzstd, "ZSTD_endStream"); - LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_flushStream, env, libzstd, "ZSTD_flushStream"); - LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_isError, env, libzstd, "ZSTD_isError"); - LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_getErrorName, env, libzstd, "ZSTD_getErrorName"); -#endif - -#ifdef WINDOWS - LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_CStreamInSize, dlsym_ZSTD_CStreamInSize, env, libzstd, "ZSTD_CStreamInSize"); - LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_CStreamOutSize, dlsym_ZSTD_CStreamOutSize, env, libzstd, "ZSTD_CStreamOutSize"); - LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_createCStream, dlsym_ZSTD_createCStream, env, libzstd, "ZSTD_createCStream"); - LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_initCStream, dlsym_ZSTD_initCStream, env, libzstd, "ZSTD_initCStream"); - LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_freeCStream, dlsym_ZSTD_freeCStream, env, libzstd, "ZSTD_freeCStream"); - LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_compressStream, dlsym_ZSTD_compressStream, env, libzstd, "ZSTD_compressStream"); - LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_endStream, dlsym_ZSTD_endStream, env, libzstd, "ZSTD_endStream"); - LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_flushStream, dlsym_ZSTD_flushStream, env, libzstd, "ZSTD_flushStream"); - LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_isError, dlsym_ZSTD_isError, env, libzstd, "ZSTD_isError"); - LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_getErrorName, dlsym_ZSTD_getErrorName, env, libzstd, "ZSTD_getErrorName"); -#endif - - // load fields - ZStandardCompressor_stream = (*env)->GetFieldID(env, clazz, "stream", "J"); - ZStandardCompressor_finish = (*env)->GetFieldID(env, clazz, "finish", "Z"); - ZStandardCompressor_finished = (*env)->GetFieldID(env, clazz, "finished", "Z"); - ZStandardCompressor_uncompressedDirectBufOff = (*env)->GetFieldID(env, clazz, "uncompressedDirectBufOff", "I"); - ZStandardCompressor_uncompressedDirectBufLen = (*env)->GetFieldID(env, clazz, "uncompressedDirectBufLen", "I"); - ZStandardCompressor_directBufferSize = (*env)->GetFieldID(env, clazz, "directBufferSize", "I"); - ZStandardCompressor_bytesRead = (*env)->GetFieldID(env, clazz, "bytesRead", "J"); - ZStandardCompressor_bytesWritten = (*env)->GetFieldID(env, clazz, "bytesWritten", "J"); -} - -// Create the compression stream -JNIEXPORT jlong JNICALL Java_org_apache_hadoop_io_compress_zstd_ZStandardCompressor_create (JNIEnv *env, jclass clazz) { - ZSTD_CStream* const stream = dlsym_ZSTD_createCStream(); - if (stream == NULL) { - THROW(env, "java/lang/InternalError", "Error creating the stream"); - return (jlong)0; - } - return (jlong) stream; -} - -// Initialize the compression stream -JNIEXPORT void JNICALL Java_org_apache_hadoop_io_compress_zstd_ZStandardCompressor_init (JNIEnv *env, jclass clazz, jint level, jlong stream) { - size_t result = dlsym_ZSTD_initCStream((ZSTD_CStream *) stream, level); - if (dlsym_ZSTD_isError(result)) { - THROW(env, "java/lang/InternalError", dlsym_ZSTD_getErrorName(result)); - return; - } -} - -// free the compression stream -JNIEXPORT void JNICALL Java_org_apache_hadoop_io_compress_zstd_ZStandardCompressor_end (JNIEnv *env, jclass clazz, jlong stream) { - size_t result = dlsym_ZSTD_freeCStream((ZSTD_CStream *) stream); - if (dlsym_ZSTD_isError(result)) { - THROW(env, "java/lang/InternalError", dlsym_ZSTD_getErrorName(result)); - return; - } -} - -JNIEXPORT jint Java_org_apache_hadoop_io_compress_zstd_ZStandardCompressor_deflateBytesDirect -(JNIEnv *env, jobject this, jobject uncompressed_direct_buf, jint uncompressed_direct_buf_off, jint uncompressed_direct_buf_len, jobject compressed_direct_buf, jint compressed_direct_buf_len ) { - ZSTD_CStream* const stream = (ZSTD_CStream*) (*env)->GetLongField(env, this, ZStandardCompressor_stream); - if (!stream) { - THROW(env, "java/lang/NullPointerException", NULL); - return (jint)0; - } - - jlong bytes_read = (*env)->GetLongField(env, this, ZStandardCompressor_bytesRead); - jlong bytes_written = (*env)->GetLongField(env, this, ZStandardCompressor_bytesWritten); - jboolean finish = (*env)->GetBooleanField(env, this, ZStandardCompressor_finish); - - // Get the input direct buffer - void * uncompressed_bytes = (*env)->GetDirectBufferAddress(env, uncompressed_direct_buf); - if (!uncompressed_bytes) { - THROW(env, "java/lang/InternalError", "Undefined memory address for uncompressedDirectBuf"); - return (jint) 0; - } - - // Get the output direct buffer - void * compressed_bytes = (*env)->GetDirectBufferAddress(env, compressed_direct_buf); - if (!compressed_bytes) { - THROW(env, "java/lang/InternalError", "Undefined memory address for compressedDirectBuf"); - return (jint) 0; - } - - ZSTD_inBuffer input = { uncompressed_bytes, uncompressed_direct_buf_len, uncompressed_direct_buf_off }; - ZSTD_outBuffer output = { compressed_bytes, compressed_direct_buf_len, 0 }; - - size_t size; - if (uncompressed_direct_buf_len != 0) { - size = dlsym_ZSTD_compressStream(stream, &output, &input); - if (dlsym_ZSTD_isError(size)) { - THROW(env, "java/lang/InternalError", dlsym_ZSTD_getErrorName(size)); - return (jint) 0; - } - } - if (finish && input.pos == input.size) { - // end the stream, flush and write the frame epilogue - size = dlsym_ZSTD_endStream(stream, &output); - if (!size) { - (*env)->SetBooleanField(env, this, ZStandardCompressor_finished, JNI_TRUE); - } - } else { - // need to flush the output buffer - // this also updates the output buffer position. - size = dlsym_ZSTD_flushStream(stream, &output); - } - if (dlsym_ZSTD_isError(size)) { - THROW(env, "java/lang/InternalError", dlsym_ZSTD_getErrorName(size)); - return (jint) 0; - } - - bytes_read += input.pos - uncompressed_direct_buf_off; - bytes_written += output.pos; - (*env)->SetLongField(env, this, ZStandardCompressor_bytesRead, bytes_read); - (*env)->SetLongField(env, this, ZStandardCompressor_bytesWritten, bytes_written); - - (*env)->SetIntField(env, this, ZStandardCompressor_uncompressedDirectBufOff, input.pos); - (*env)->SetIntField(env, this, ZStandardCompressor_uncompressedDirectBufLen, input.size); - return (jint) output.pos; -} - -JNIEXPORT jstring JNICALL Java_org_apache_hadoop_io_compress_zstd_ZStandardCompressor_getLibraryName -(JNIEnv *env, jclass clazz) { -#ifdef UNIX - if (dlsym_ZSTD_isError) { - Dl_info dl_info; - if (dladdr( dlsym_ZSTD_isError, &dl_info)) { - return (*env)->NewStringUTF(env, dl_info.dli_fname); - } - } - return (*env)->NewStringUTF(env, HADOOP_ZSTD_LIBRARY); -#endif -#ifdef WINDOWS - LPWSTR filename = NULL; - GetLibraryName(dlsym_ZSTD_isError, &filename); - if (filename != NULL) { - return (*env)->NewString(env, filename, (jsize) wcslen(filename)); - } else { - return (*env)->NewStringUTF(env, "Unavailable"); - } -#endif -} - -// returns the max size of the recommended input and output buffers -JNIEXPORT jint JNICALL Java_org_apache_hadoop_io_compress_zstd_ZStandardCompressor_getStreamSize -(JNIEnv *env, jclass clazz) { - int x = (int) dlsym_ZSTD_CStreamInSize(); - int y = (int) dlsym_ZSTD_CStreamOutSize(); - return (x >= y) ? x : y; -} - -#endif //define HADOOP_ZSTD_LIBRARY \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.c deleted file mode 100644 index e28359b526cdb..0000000000000 --- a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.c +++ /dev/null @@ -1,218 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "org_apache_hadoop_io_compress_zstd.h" - -#if defined HADOOP_ZSTD_LIBRARY - -#include -#include -#include - -#ifdef UNIX -#include -#include "config.h" -#endif - -#include "org_apache_hadoop_io_compress_zstd_ZStandardDecompressor.h" - -static jfieldID ZStandardDecompressor_stream; -static jfieldID ZStandardDecompressor_compressedDirectBufOff; -static jfieldID ZStandardDecompressor_bytesInCompressedBuffer; -static jfieldID ZStandardDecompressor_directBufferSize; -static jfieldID ZStandardDecompressor_finished; -static jfieldID ZStandardDecompressor_remaining; - -#ifdef UNIX -static size_t (*dlsym_ZSTD_DStreamOutSize)(void); -static size_t (*dlsym_ZSTD_DStreamInSize)(void); -static ZSTD_DStream* (*dlsym_ZSTD_createDStream)(void); -static size_t (*dlsym_ZSTD_initDStream)(ZSTD_DStream*); -static size_t (*dlsym_ZSTD_freeDStream)(ZSTD_DStream*); -static size_t (*dlsym_ZSTD_resetDStream)(ZSTD_DStream*); -static size_t (*dlsym_ZSTD_decompressStream)(ZSTD_DStream*, ZSTD_outBuffer*, ZSTD_inBuffer*); -static size_t (*dlsym_ZSTD_flushStream)(ZSTD_CStream*, ZSTD_outBuffer*); -static unsigned (*dlsym_ZSTD_isError)(size_t); -static const char * (*dlsym_ZSTD_getErrorName)(size_t); -#endif - -#ifdef WINDOWS -typedef size_t (__cdecl *__dlsym_ZSTD_DStreamOutSize)(void); -typedef size_t (__cdecl *__dlsym_ZSTD_DStreamInSize)(void); -typedef ZSTD_DStream* (__cdecl *__dlsym_ZSTD_createDStream)(void); -typedef size_t (__cdecl *__dlsym_ZSTD_initDStream)(ZSTD_DStream*); -typedef size_t (__cdecl *__dlsym_ZSTD_freeDStream)(ZSTD_DStream*); -typedef size_t (__cdecl *__dlsym_ZSTD_resetDStream)(ZSTD_DStream*); -typedef size_t (__cdecl *__dlsym_ZSTD_decompressStream)(ZSTD_DStream*, ZSTD_outBuffer*, ZSTD_inBuffer*); -typedef size_t (__cdecl *__dlsym_ZSTD_flushStream)(ZSTD_CStream*, ZSTD_outBuffer*); -typedef unsigned (__cdecl *__dlsym_ZSTD_isError)(size_t); -typedef const char * (__cdecl *__dlsym_ZSTD_getErrorName)(size_t); - -static __dlsym_ZSTD_DStreamOutSize dlsym_ZSTD_DStreamOutSize; -static __dlsym_ZSTD_DStreamInSize dlsym_ZSTD_DStreamInSize; -static __dlsym_ZSTD_createDStream dlsym_ZSTD_createDStream; -static __dlsym_ZSTD_initDStream dlsym_ZSTD_initDStream; -static __dlsym_ZSTD_freeDStream dlsym_ZSTD_freeDStream; -static __dlsym_ZSTD_resetDStream dlsym_ZSTD_resetDStream; -static __dlsym_ZSTD_decompressStream dlsym_ZSTD_decompressStream; -static __dlsym_ZSTD_isError dlsym_ZSTD_isError; -static __dlsym_ZSTD_getErrorName dlsym_ZSTD_getErrorName; -static __dlsym_ZSTD_flushStream dlsym_ZSTD_flushStream; -#endif - -JNIEXPORT void JNICALL Java_org_apache_hadoop_io_compress_zstd_ZStandardDecompressor_initIDs (JNIEnv *env, jclass clazz) { - // Load libzstd.so -#ifdef UNIX - void *libzstd = dlopen(HADOOP_ZSTD_LIBRARY, RTLD_LAZY | RTLD_GLOBAL); - if (!libzstd) { - char* msg = (char*)malloc(1000); - snprintf(msg, 1000, "%s (%s)!", "Cannot load " HADOOP_ZSTD_LIBRARY, dlerror()); - THROW(env, "java/lang/UnsatisfiedLinkError", msg); - return; - } -#endif - -#ifdef WINDOWS - HMODULE libzstd = LoadLibrary(HADOOP_ZSTD_LIBRARY); - if (!libzstd) { - THROW(env, "java/lang/UnsatisfiedLinkError", "Cannot load zstd.dll"); - return; - } -#endif - -#ifdef UNIX - dlerror(); - LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_DStreamOutSize, env, libzstd, "ZSTD_DStreamOutSize"); - LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_DStreamInSize, env, libzstd, "ZSTD_DStreamInSize"); - LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_createDStream, env, libzstd, "ZSTD_createDStream"); - LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_initDStream, env, libzstd, "ZSTD_initDStream"); - LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_freeDStream, env, libzstd, "ZSTD_freeDStream"); - LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_resetDStream, env, libzstd, "ZSTD_resetDStream"); - LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_decompressStream, env, libzstd, "ZSTD_decompressStream"); - LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_isError, env, libzstd, "ZSTD_isError"); - LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_getErrorName, env, libzstd, "ZSTD_getErrorName"); - LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_flushStream, env, libzstd, "ZSTD_flushStream"); -#endif - -#ifdef WINDOWS - LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_DStreamOutSize, dlsym_ZSTD_DStreamOutSize, env, libzstd, "ZSTD_DStreamOutSize"); - LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_DStreamInSize, dlsym_ZSTD_DStreamInSize, env, libzstd, "ZSTD_DStreamInSize"); - LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_createDStream, dlsym_ZSTD_createDStream, env, libzstd, "ZSTD_createDStream"); - LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_initDStream, dlsym_ZSTD_initDStream, env, libzstd, "ZSTD_initDStream"); - LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_freeDStream, dlsym_ZSTD_freeDStream, env, libzstd, "ZSTD_freeDStream"); - LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_resetDStream, dlsym_ZSTD_resetDStream, env, libzstd, "ZSTD_resetDStream"); - LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_decompressStream, dlsym_ZSTD_decompressStream, env, libzstd, "ZSTD_decompressStream"); - LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_isError, dlsym_ZSTD_isError, env, libzstd, "ZSTD_isError"); - LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_getErrorName, dlsym_ZSTD_getErrorName, env, libzstd, "ZSTD_getErrorName"); - LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_flushStream, dlsym_ZSTD_flushStream, env, libzstd, "ZSTD_flushStream"); -#endif - - ZStandardDecompressor_stream = (*env)->GetFieldID(env, clazz, "stream", "J"); - ZStandardDecompressor_finished = (*env)->GetFieldID(env, clazz, "finished", "Z"); - ZStandardDecompressor_compressedDirectBufOff = (*env)->GetFieldID(env, clazz, "compressedDirectBufOff", "I"); - ZStandardDecompressor_bytesInCompressedBuffer = (*env)->GetFieldID(env, clazz, "bytesInCompressedBuffer", "I"); - ZStandardDecompressor_directBufferSize = (*env)->GetFieldID(env, clazz, "directBufferSize", "I"); - ZStandardDecompressor_remaining = (*env)->GetFieldID(env, clazz, "remaining", "I"); -} - -JNIEXPORT jlong JNICALL Java_org_apache_hadoop_io_compress_zstd_ZStandardDecompressor_create(JNIEnv *env, jclass clazz) { - ZSTD_DStream * stream = dlsym_ZSTD_createDStream(); - if (stream == NULL) { - THROW(env, "java/lang/InternalError", "Error creating stream"); - return (jlong) 0; - } - return (jlong) stream; -} - -JNIEXPORT void JNICALL Java_org_apache_hadoop_io_compress_zstd_ZStandardDecompressor_init(JNIEnv *env, jclass clazz, jlong stream) { - size_t result = dlsym_ZSTD_initDStream((ZSTD_DStream *) stream); - if (dlsym_ZSTD_isError(result)) { - THROW(env, "java/lang/InternalError", dlsym_ZSTD_getErrorName(result)); - return; - } -} - - -JNIEXPORT void JNICALL Java_org_apache_hadoop_io_compress_zstd_ZStandardDecompressor_free(JNIEnv *env, jclass clazz, jlong stream) { - size_t result = dlsym_ZSTD_freeDStream((ZSTD_DStream *) stream); - if (dlsym_ZSTD_isError(result)) { - THROW(env, "java/lang/InternalError", dlsym_ZSTD_getErrorName(result)); - return; - } -} - -JNIEXPORT jint JNICALL Java_org_apache_hadoop_io_compress_zstd_ZStandardDecompressor_inflateBytesDirect -(JNIEnv *env, jobject this, jobject compressed_direct_buf, jint compressed_direct_buf_off, jint compressed_direct_buf_len, jobject uncompressed_direct_buf, jint uncompressed_direct_buf_off, jint uncompressed_direct_buf_len) { - ZSTD_DStream *stream = (ZSTD_DStream *) (*env)->GetLongField(env, this, ZStandardDecompressor_stream); - if (!stream) { - THROW(env, "java/lang/NullPointerException", NULL); - return (jint)0; - } - - // Get the input direct buffer - void * compressed_bytes = (*env)->GetDirectBufferAddress(env, compressed_direct_buf); - if (!compressed_bytes) { - THROW(env, "java/lang/InternalError", "Undefined memory address for compressedDirectBuf"); - return (jint) 0; - } - - // Get the output direct buffer - void * uncompressed_bytes = (*env)->GetDirectBufferAddress(env, uncompressed_direct_buf); - if (!uncompressed_bytes) { - THROW(env, "java/lang/InternalError", "Undefined memory address for uncompressedDirectBuf"); - return (jint) 0; - } - uncompressed_bytes = ((char*) uncompressed_bytes) + uncompressed_direct_buf_off; - uncompressed_direct_buf_len -= uncompressed_direct_buf_off; - - ZSTD_inBuffer input = { compressed_bytes, compressed_direct_buf_len, compressed_direct_buf_off }; - ZSTD_outBuffer output = { uncompressed_bytes, uncompressed_direct_buf_len, 0 }; - - size_t const size = dlsym_ZSTD_decompressStream(stream, &output, &input); - - // check for errors - if (dlsym_ZSTD_isError(size)) { - THROW(env, "java/lang/InternalError", dlsym_ZSTD_getErrorName(size)); - return (jint) 0; - } - int remaining = input.size - input.pos; - (*env)->SetIntField(env, this, ZStandardDecompressor_remaining, remaining); - - // the entire frame has been decoded - if (size == 0) { - (*env)->SetBooleanField(env, this, ZStandardDecompressor_finished, JNI_TRUE); - size_t result = dlsym_ZSTD_initDStream(stream); - if (dlsym_ZSTD_isError(result)) { - THROW(env, "java/lang/InternalError", dlsym_ZSTD_getErrorName(result)); - return (jint) 0; - } - } - (*env)->SetIntField(env, this, ZStandardDecompressor_compressedDirectBufOff, input.pos); - (*env)->SetIntField(env, this, ZStandardDecompressor_bytesInCompressedBuffer, input.size); - return (jint) output.pos; -} - -// returns the max size of the recommended input and output buffers -JNIEXPORT jint JNICALL Java_org_apache_hadoop_io_compress_zstd_ZStandardDecompressor_getStreamSize -(JNIEnv *env, jclass clazz) { - int x = (int) dlsym_ZSTD_DStreamInSize(); - int y = (int) dlsym_ZSTD_DStreamOutSize(); - return (x >= y) ? x : y; -} - -#endif //define HADOOP_ZSTD_LIBRARY diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/org_apache_hadoop_io_compress_zstd.h b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/org_apache_hadoop_io_compress_zstd.h deleted file mode 100644 index 78fc0a4a0b327..0000000000000 --- a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/org_apache_hadoop_io_compress_zstd.h +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -#ifndef ORG_APACHE_HADOOP_IO_COMPRESS_ZSTD_ZSTD_H -#define ORG_APACHE_HADOOP_IO_COMPRESS_ZSTD_ZSTD_H - -#include "org_apache_hadoop.h" - -#ifdef UNIX -#include -#endif - -#include -#include -#include - - -#endif //ORG_APACHE_HADOOP_IO_COMPRESS_ZSTD_ZSTD_H diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/util/NativeCodeLoader.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/util/NativeCodeLoader.c index 1bd7fa18bf31b..ae8263aac651d 100644 --- a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/util/NativeCodeLoader.c +++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/util/NativeCodeLoader.c @@ -39,17 +39,6 @@ JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_util_NativeCodeLoader_buildSup #endif } -JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_util_NativeCodeLoader_buildSupportsZstd - (JNIEnv *env, jclass clazz) -{ -#ifdef HADOOP_ZSTD_LIBRARY - return JNI_TRUE; -#else - return JNI_FALSE; -#endif -} - - JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_util_NativeCodeLoader_buildSupportsOpenssl (JNIEnv *env, jclass clazz) { diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java index 74b6b473f7ce1..ae7e0fce77367 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java @@ -526,7 +526,6 @@ public void testSequenceFileBZip2Codec() throws IOException, ClassNotFoundExcept @Test @Timeout(value = 20) public void testSequenceFileZStandardCodec() throws Exception { - assumeTrue(ZStandardCodec.isNativeCodeLoaded()); Configuration conf = new Configuration(); sequenceFileCodecTest(conf, 0, "org.apache.hadoop.io.compress.ZStandardCodec", 100); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCompressionStreamReuse.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCompressionStreamReuse.java index f02a374d3550a..454d680c6026c 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCompressionStreamReuse.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCompressionStreamReuse.java @@ -38,7 +38,6 @@ import org.slf4j.LoggerFactory; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assumptions.assumeTrue; public class TestCompressionStreamReuse { private static final Logger LOG = LoggerFactory @@ -62,7 +61,6 @@ public void testGzipCompressStreamReuse() throws IOException { @Test public void testZStandardCompressStreamReuse() throws IOException { - assumeTrue(ZStandardCodec.isNativeCodeLoaded()); resetStateTest(conf, seed, count, "org.apache.hadoop.io.compress.ZStandardCodec"); } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zstd/TestZStandardCompressorDecompressor.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zstd/TestZStandardCompressorDecompressor.java index 389efdac78005..a141a974e591f 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zstd/TestZStandardCompressorDecompressor.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zstd/TestZStandardCompressorDecompressor.java @@ -28,7 +28,6 @@ import org.apache.hadoop.io.compress.DecompressorStream; import org.apache.hadoop.io.compress.ZStandardCodec; import org.apache.hadoop.test.MultithreadedTestUtil; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -51,7 +50,6 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assumptions.assumeTrue; public class TestZStandardCompressorDecompressor { private final static char[] HEX_ARRAY = "0123456789ABCDEF".toCharArray(); @@ -69,11 +67,6 @@ public static void beforeClass() throws Exception { .getResource("/zstd/test_file.txt.zst").toURI()); } - @BeforeEach - public void before() throws Exception { - assumeTrue(ZStandardCodec.isNativeCodeLoaded()); - } - @Test public void testCompressionCompressesCorrectly() throws Exception { int uncompressedSize = (int) FileUtils.sizeOf(uncompressedFile); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/pom.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/pom.xml index ee86f64db8848..697d45f46fe0a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/pom.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/pom.xml @@ -125,10 +125,6 @@ false - - - - false @@ -146,8 +142,7 @@ mac unix - native build only supported on Mac or - Unix + native build only supported on Mac or Unix true @@ -197,10 +192,6 @@ ${snappy.prefix} ${snappy.lib} ${snappy.include} - ${require.zstd} - ${zstd.prefix} - ${zstd.lib} - ${zstd.include} diff --git a/hadoop-project-dist/pom.xml b/hadoop-project-dist/pom.xml index b162566bdcff5..42dcf39fe23c5 100644 --- a/hadoop-project-dist/pom.xml +++ b/hadoop-project-dist/pom.xml @@ -40,9 +40,6 @@ UNDEF false - - false - false false @@ -375,9 +372,6 @@ --openssllibbundle=${bundle.openssl} --snappylib=${snappy.lib} --snappylibbundle=${bundle.snappy} - --zstdbinbundle=${bundle.zstd.in.bin} - --zstdlib=${zstd.lib} - --zstdlibbundle=${bundle.zstd} diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index 3bfc4099f4049..737768ba1d631 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -155,6 +155,7 @@ 4.1.130.Final 1.1.10.4 1.10.4 + 1.5.6-4 1.17.6 @@ -2095,6 +2096,11 @@ lz4-java ${lz4-java.version} + + com.github.luben + zstd-jni + ${zstd-jni.version} + org.apache.logging.log4j log4j-1.2-api @@ -2681,7 +2687,6 @@ file:/dev/urandom - true true @@ -2692,7 +2697,6 @@ - ${env.PATH};${hadoop.common.build.dir}/bin;${zstd.lib} ${env.PATH};${hadoop.common.build.dir}/bin;${openssl.lib} ${env.PATH};${hadoop.common.build.dir}/bin;${isal.lib}