cfDescriptors = new ArrayList<>();
- this.compactionFilterFactory = new ConsumeQueueCompactionFilterFactory(messageStore::getMinPhyOffset);
-
- ColumnFamilyOptions cqCfOptions = RocksDBOptionsFactory.createCQCFOptions(this.messageStore, this.compactionFilterFactory);
+ ColumnFamilyOptions cqCfOptions = RocksDBOptionsFactory.createCQCFOptions(this.messageStore);
this.cfOptions.add(cqCfOptions);
cfDescriptors.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, cqCfOptions));
ColumnFamilyOptions offsetCfOptions = RocksDBOptionsFactory.createOffsetCFOptions();
this.cfOptions.add(offsetCfOptions);
cfDescriptors.add(new ColumnFamilyDescriptor(OFFSET_COLUMN_FAMILY, offsetCfOptions));
+
+ if (CqCompactionFilterJni.isLoaded()) {
+ CqCompactionFilterJni.createAndSetFilter(cqCfOptions);
+ CqCompactionFilterJni.setMinPhyOffset(messageStore.getMinPhyOffset());
+ log.info("CqCompactionFilter created and set, minPhyOffset: {}", messageStore.getMinPhyOffset());
+ } else {
+ log.warn("CqCompactionFilterJni native library not loaded, compaction filter will not be installed");
+ }
+
open(cfDescriptors);
this.defaultCFHandle = cfHandles.get(0);
this.offsetCFHandle = cfHandles.get(1);
} catch (final Exception e) {
- LOGGER.error("postLoad Failed. {}", this.dbPath, e);
+ log.error("postLoad Failed. {}", this.dbPath, e);
return false;
}
return true;
@@ -91,11 +102,6 @@ protected void preShutdown() {
if (this.offsetCFHandle != null) {
this.offsetCFHandle.close();
}
-
- if (this.compactionFilterFactory != null) {
- this.compactionFilterFactory.close();
- }
-
}
public byte[] getCQ(final byte[] keyBytes) throws RocksDBException {
@@ -116,10 +122,13 @@ public void batchPut(final WriteBatch batch) throws RocksDBException {
}
public void manualCompaction(final long minPhyOffset) {
+ if (CqCompactionFilterJni.isLoaded()) {
+ CqCompactionFilterJni.setMinPhyOffset(minPhyOffset);
+ }
try {
- manualCompaction(minPhyOffset, this.compactRangeOptions);
+ super.manualCompaction(this.compactRangeOptions);
} catch (Exception e) {
- LOGGER.error("manualCompaction Failed. minPhyOffset: {}", minPhyOffset, e);
+ log.error("manualCompaction Failed. minPhyOffset: {}", minPhyOffset, e);
}
}
@@ -130,4 +139,41 @@ public RocksIterator seekOffsetCF() {
public ColumnFamilyHandle getOffsetCFHandle() {
return this.offsetCFHandle;
}
+
+ /**
+ * Synchronously trigger compaction with an updated compaction filter threshold.
+ * This method updates the native compaction filter's minPhyOffset and then
+ * performs a full compaction on the default column family.
+ */
+ public void triggerCompactionSync(long minPhyOffset) throws RocksDBException {
+ if (CqCompactionFilterJni.isLoaded()) {
+ CqCompactionFilterJni.setMinPhyOffset(minPhyOffset);
+ }
+ db.compactRange(this.defaultCFHandle);
+ }
+
+ /**
+ * Flush all memtables to SST files.
+ */
+ public void flushAll() throws RocksDBException {
+ try (FlushOptions flushOpts = new FlushOptions()) {
+ flushOpts.setWaitForFlush(true);
+ flush(flushOpts);
+ }
+ }
+
+ /**
+ * Count all entries in the default column family by iterating. O(N), use only in tests.
+ */
+ public long countEntries() {
+ long count = 0;
+ try (RocksIterator iter = db.newIterator(this.defaultCFHandle)) {
+ iter.seekToFirst();
+ while (iter.isValid()) {
+ count++;
+ iter.next();
+ }
+ }
+ return count;
+ }
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/rocksdb/CqCompactionFilterJni.java b/store/src/main/java/org/apache/rocketmq/store/rocksdb/CqCompactionFilterJni.java
new file mode 100644
index 00000000000..69d74e3364b
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/rocksdb/CqCompactionFilterJni.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.rocksdb;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.rocksdb.ColumnFamilyOptions;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+
+public class CqCompactionFilterJni {
+
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKSDB_LOGGER_NAME);
+
+ private static final AtomicLong NATIVE_FILTER_PTR = new AtomicLong(0);
+ private static volatile boolean loaded = false;
+
+ /** Platform-specific shim library name and extension. */
+ private static final String SHIM_LIB_NAME;
+ private static final String SHIM_LIB_EXTENSION;
+ private static final String ROCKSDB_JNI_LIB_NAME;
+
+ static {
+ String os = System.getProperty("os.name").toLowerCase();
+ String arch = System.getProperty("os.arch");
+ if (os.contains("mac") || os.contains("darwin") || os.contains("osx")) {
+ SHIM_LIB_NAME = "libcq_compaction_filter.dylib";
+ SHIM_LIB_EXTENSION = ".dylib";
+ ROCKSDB_JNI_LIB_NAME = arch.contains("aarch") || arch.contains("arm")
+ ? "librocksdbjni-osx-aarch64"
+ : "librocksdbjni-osx-x86_64";
+ } else if (os.contains("win")) {
+ SHIM_LIB_NAME = "cq_compaction_filter.dll";
+ SHIM_LIB_EXTENSION = ".dll";
+ ROCKSDB_JNI_LIB_NAME = "librocksdbjni-win64.dll";
+ } else {
+ SHIM_LIB_NAME = arch.contains("aarch") || arch.contains("arm")
+ ? "libcq_compaction_filter_aarch64.so"
+ : "libcq_compaction_filter.so";
+ SHIM_LIB_EXTENSION = ".so";
+ ROCKSDB_JNI_LIB_NAME = arch.contains("aarch") || arch.contains("arm")
+ ? "librocksdbjni-linux-aarch64.so"
+ : "librocksdbjni-linux64.so";
+ }
+ }
+
+ static {
+ loadNativeShim();
+ }
+
+ private static synchronized void loadNativeShim() {
+ if (loaded) {
+ return;
+ }
+
+ // Preload RocksDB's native library so that linked symbols are available
+ // when our compaction filter shim is loaded.
+ String rocksdbDir = ensureRocksDBNativeLoaded();
+
+ String libName = SHIM_LIB_NAME;
+ try (InputStream is = CqCompactionFilterJni.class
+ .getClassLoader().getResourceAsStream("native/" + libName)) {
+ if (is == null) {
+ log.error("[CqCompactionFilterJni] Native library '{}' not found on classpath", libName);
+ return;
+ }
+ File tempLib;
+ if (rocksdbDir != null) {
+ // Extract our shim to the same temp directory as the RocksDB JNI library,
+ // so that the DT_NEEDED / LC_LOAD_DYLIB dependency can be resolved.
+ tempLib = new File(rocksdbDir, libName);
+ } else {
+ // RocksDB was loaded from java.library.path; our shim can go anywhere.
+ tempLib = File.createTempFile("cq_compaction_filter_", SHIM_LIB_EXTENSION);
+ }
+ Files.copy(is, tempLib.toPath(), StandardCopyOption.REPLACE_EXISTING);
+ tempLib.deleteOnExit();
+ System.load(tempLib.getAbsolutePath());
+ loaded = true;
+ log.info("[CqCompactionFilterJni] Native library loaded from classpath: {}", tempLib.getAbsolutePath());
+ } catch (IOException e) {
+ log.error("[CqCompactionFilterJni] Failed to load native shim", e);
+ }
+ }
+
+ /**
+ * Returns whether the native compaction filter shim was successfully loaded.
+ */
+ public static boolean isLoaded() {
+ return loaded;
+ }
+
+ /**
+ * Locates and loads the RocksDB native JNI library, returning the temporary
+ * directory in which it was extracted (or null if loaded from java.library.path).
+ *
+ * This method deliberately uses {@code System.loadLibrary("rocksdbjni")}
+ * rather than {@code RocksDB.loadLibrary()} for the following reasons:
+ *
+ * - Avoid unnecessary side effects — {@code RocksDB.loadLibrary()}
+ * iterates over all compression types (snappy, lz4, zstd, bzip2, etc.)
+ * and attempts to load each one. Those libraries are not needed by this
+ * compaction filter, and the resulting {@code UnsatisfiedLinkError}s slow
+ * down startup and pollute logs.
+ * - Control the temp directory location — The caller needs to know
+ * the directory where the native JNI library was extracted so that
+ * {@code libcq_compaction_filter.so} can be placed alongside it. This is
+ * required for the dynamic linker to resolve the {@code DT_NEEDED}
+ * dependency of the custom shim. {@code RocksDB.loadLibrary()} extracts
+ * to an internal temp directory that is not exposed to callers.
+ * - Avoid class-loading coupling — {@code RocksDB.loadLibrary()}
+ * triggers the full initialization chain of the rocksdbjni Java bindings
+ * (including {@code CompressionType.values()} iteration and a singleton
+ * {@code NativeLibraryLoader} state machine). Loading the custom shim
+ * must complete before any RocksDB Java classes are exercised, to avoid
+ * native symbol resolution race conditions.
+ *
+ *
+ * @return the absolute path of the temporary directory containing the
+ * extracted RocksDB JNI library, or null if the library was loaded
+ * from {@code java.library.path} (in which case no temp directory
+ * is needed for the shim).
+ */
+ private static String ensureRocksDBNativeLoaded() {
+ // Try System.loadLibrary first (works if on java.library.path)
+ try {
+ System.loadLibrary("rocksdbjni");
+ // No temp dir needed since it's on java.library.path
+ return null;
+ } catch (UnsatisfiedLinkError ignored) {
+ // Not on java.library.path, try from JAR
+ }
+
+ // Determine the platform-specific JNI library name from RocksDB's Environment
+ String jniLibName;
+ try {
+ jniLibName = org.rocksdb.util.Environment.getJniLibraryFileName("rocksdb");
+ } catch (Exception e) {
+ jniLibName = ROCKSDB_JNI_LIB_NAME;
+ }
+
+ try (InputStream is = CqCompactionFilterJni.class.getClassLoader().getResourceAsStream(jniLibName)) {
+ if (is == null) {
+ log.error("[CqCompactionFilterJni] RocksDB native library '{}' not found on classpath", jniLibName);
+ return null;
+ }
+ // Create a temp directory and extract the library there.
+ // Our shim will be placed in the same directory so the DT_NEEDED
+ // dependency resolves correctly.
+ File tempDir = Files.createTempDirectory("rocksdb-native").toFile();
+ tempDir.deleteOnExit();
+ File tempLib = new File(tempDir, jniLibName);
+ Files.copy(is, tempLib.toPath(), StandardCopyOption.REPLACE_EXISTING);
+ tempLib.deleteOnExit();
+ System.load(tempLib.getAbsolutePath());
+ return tempDir.getAbsolutePath();
+ } catch (IOException e) {
+ log.error("[CqCompactionFilterJni] Failed to extract RocksDB native library", e);
+ return null;
+ }
+ }
+
+ /**
+ * Create a native CqCompactionFilter instance.
+ * Returns the raw C++ pointer as a jlong.
+ */
+ public static native long createNativeFilter0();
+
+ /**
+ * Update the minPhyOffset threshold on an existing native filter.
+ */
+ public static native void setMinPhyOffset0(long filterPtr, long minPhyOffset);
+
+ /**
+ * Set the native compaction filter on the ColumnFamilyOptions via the
+ * public {@code setCompactionFilter} API.
+ *
+ * The wrapper uses {@code disOwnNativeHandle()} so that closing the
+ * ColumnFamilyOptions does not free the native filter — this prevents
+ * use-after-free when AbstractRocksDBStorage closes options before the DB.
+ */
+ public static void setNativeFilter(ColumnFamilyOptions options, long filterPtr) {
+ NativeCqCompactionFilter filter = new NativeCqCompactionFilter(filterPtr);
+ options.setCompactionFilter(filter);
+ }
+
+ /**
+ * Create the native filter and set it on the ColumnFamilyOptions.
+ * Returns the native pointer for later threshold updates.
+ */
+ @SuppressWarnings("UnusedReturnValue")
+ public static long createAndSetFilter(ColumnFamilyOptions options) {
+ long ptr = createNativeFilter0();
+ NATIVE_FILTER_PTR.set(ptr);
+ setNativeFilter(options, ptr);
+ return ptr;
+ }
+
+ /**
+ * Update the minPhyOffset on the current native filter.
+ */
+ public static void setMinPhyOffset(long minPhyOffset) {
+ long ptr = NATIVE_FILTER_PTR.get();
+ if (ptr != 0) {
+ setMinPhyOffset0(ptr, minPhyOffset);
+ log.info("CqCompactionFilter setMinPhyOffset={}", minPhyOffset);
+ }
+ }
+}
\ No newline at end of file
diff --git a/store/src/main/java/org/apache/rocketmq/store/rocksdb/NativeCqCompactionFilter.java b/store/src/main/java/org/apache/rocketmq/store/rocksdb/NativeCqCompactionFilter.java
new file mode 100644
index 00000000000..6a3101c261a
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/rocksdb/NativeCqCompactionFilter.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.rocksdb;
+
+import org.rocksdb.AbstractCompactionFilter;
+import org.rocksdb.Slice;
+
+/**
+ * Thin Java wrapper around a native CqCompactionFilter C++ pointer.
+ *
+ * The native filter is allocated by {@link CqCompactionFilterJni#createNativeFilter0()}
+ * and its lifetime is managed externally (it lives for the entire JVM session).
+ * {@link #disOwnNativeHandle()} is called so that {@code close()} does not
+ * free the native memory — this is critical because {@code AbstractRocksDBStorage}
+ * closes {@code ColumnFamilyOptions} (which closes this filter) before closing
+ * the DB, while background compaction threads may still reference the filter.
+ */
+class NativeCqCompactionFilter extends AbstractCompactionFilter {
+
+ NativeCqCompactionFilter(long nativeHandle) {
+ super(nativeHandle);
+ disOwnNativeHandle();
+ }
+}
diff --git a/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java b/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java
index b74cf8c85d5..37eec67d357 100644
--- a/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java
+++ b/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java
@@ -41,8 +41,7 @@
public class RocksDBOptionsFactory {
- public static ColumnFamilyOptions createCQCFOptions(final MessageStore messageStore,
- ConsumeQueueCompactionFilterFactory consumeQueueCompactionFilterFactory) {
+ public static ColumnFamilyOptions createCQCFOptions(final MessageStore messageStore) {
BlockBasedTableConfig blockBasedTableConfig = new BlockBasedTableConfig().
setFormatVersion(5).
setIndexType(IndexType.kBinarySearch).
@@ -93,7 +92,6 @@ public static ColumnFamilyOptions createCQCFOptions(final MessageStore messageSt
setTargetFileSizeBase(256 * SizeUnit.MB).
setTargetFileSizeMultiplier(2).
setMergeOperator(new StringAppendOperator()).
- setCompactionFilterFactory(consumeQueueCompactionFilterFactory).
setReportBgIoStats(true).
setOptimizeFiltersForHits(true);
}
diff --git a/store/src/main/resources/native/cq_compaction_filter.cpp b/store/src/main/resources/native/cq_compaction_filter.cpp
new file mode 100644
index 00000000000..1d0bd84cd90
--- /dev/null
+++ b/store/src/main/resources/native/cq_compaction_filter.cpp
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Native compaction filter for ConsumeQueue entries.
+ *
+ * Subclass rocksdb::CompactionFilter directly, create instances in C++,
+ * and pass the raw C++ pointer as a jlong to Java. Java's
+ * AbstractCompactionFilter(nativeHandle) wraps it seamlessly.
+ *
+ * All rocksdb symbols are declared weak so they resolve at runtime to the
+ * symbols already loaded by the JVM's ClassLoader.
+ */
+
+#include
+#include
+#include
+#include
+
+#include "rocksdb/compaction_filter.h"
+#include "rocksdb/slice.h"
+
+/* ------------------------------------------------------------------ */
+/* Windows stub implementations */
+/* */
+/* On Linux/macOS, ELF/Mach-O shared libraries export all symbols by */
+/* default, so the shim resolves inherited virtual methods from */
+/* librocksdbjni at link time. On Windows, DLLs only export symbols */
+/* marked __declspec(dllexport) — rocksdbjni only exports JNI entry */
+/* points, not internal C++ class methods. We must provide stub */
+/* implementations for the Configurable/Customizable virtual methods */
+/* that appear in CompactionFilter's vtable. These stubs are never */
+/* called at runtime (RocksDB only invokes Filter() and Name() on */
+/* compaction filters), but the linker needs addresses for them. */
+/* ------------------------------------------------------------------ */
+
+#ifdef _WIN32
+
+#include "rocksdb/configurable.h"
+#include "rocksdb/customizable.h"
+#include
+#include
+
+namespace rocksdb {
+
+struct ConfigOptions;
+struct DBOptions;
+struct ColumnFamilyOptions;
+class OptionTypeInfo;
+
+// --- Configurable virtual methods (defined in options/configurable.cc) ---
+
+Status Configurable::GetOption(const ConfigOptions&, const std::string&,
+ std::string*) const {
+ return Status();
+}
+
+bool Configurable::AreEquivalent(const ConfigOptions&, const Configurable*,
+ std::string*) const {
+ return true;
+}
+
+Status Configurable::PrepareOptions(const ConfigOptions&) {
+ return Status();
+}
+
+Status Configurable::ValidateOptions(const DBOptions&,
+ const ColumnFamilyOptions&) const {
+ return Status();
+}
+
+const void* Configurable::GetOptionsPtr(const std::string&) const {
+ return nullptr;
+}
+
+Status Configurable::ParseStringOptions(const ConfigOptions&,
+ const std::string&) {
+ return Status();
+}
+
+Status Configurable::ConfigureOptions(
+ const ConfigOptions&,
+ const std::unordered_map&,
+ std::unordered_map*) {
+ return Status();
+}
+
+Status Configurable::ParseOption(const ConfigOptions&, const OptionTypeInfo&,
+ const std::string&, const std::string&,
+ void*) {
+ return Status();
+}
+
+bool Configurable::OptionsAreEqual(const ConfigOptions&, const OptionTypeInfo&,
+ const std::string&, const void*,
+ const void*, std::string*) const {
+ return true;
+}
+
+std::string Configurable::SerializeOptions(const ConfigOptions&,
+ const std::string&) const {
+ return "";
+}
+
+std::string Configurable::GetOptionName(const std::string& name) const {
+ return name;
+}
+
+// Non-virtual, but referenced by inline code paths
+void Configurable::RegisterOptions(const std::string&, void*,
+ const std::unordered_map*) {}
+
+Status Configurable::ConfigureFromMap(
+ const ConfigOptions&,
+ const std::unordered_map&) {
+ return Status();
+}
+
+Status Configurable::ConfigureFromMap(
+ const ConfigOptions&,
+ const std::unordered_map&,
+ std::unordered_map*) {
+ return Status();
+}
+
+Status Configurable::ConfigureOption(const ConfigOptions&, const std::string&,
+ const std::string&) {
+ return Status();
+}
+
+Status Configurable::ConfigureFromString(const ConfigOptions&,
+ const std::string&) {
+ return Status();
+}
+
+Status Configurable::GetOptionString(const ConfigOptions&,
+ std::string*) const {
+ return Status();
+}
+
+std::string Configurable::ToString(const ConfigOptions&,
+ const std::string&) const {
+ return "";
+}
+
+Status Configurable::GetOptionNames(const ConfigOptions&,
+ std::unordered_set*) const {
+ return Status();
+}
+
+Status Configurable::GetOptionsMap(
+ const std::string&, const std::string&, std::string*,
+ std::unordered_map*) {
+ return Status();
+}
+
+// --- Customizable virtual/override methods (defined in options/customizable.cc) ---
+
+Status Customizable::GetOption(const ConfigOptions&, const std::string&,
+ std::string*) const {
+ return Status();
+}
+
+bool Customizable::AreEquivalent(const ConfigOptions&, const Configurable*,
+ std::string*) const {
+ return true;
+}
+
+std::string Customizable::GetOptionName(const std::string& name) const {
+ return name;
+}
+
+std::string Customizable::SerializeOptions(const ConfigOptions&,
+ const std::string&) const {
+ return "";
+}
+
+std::string Customizable::GenerateIndividualId() const {
+ return "stub";
+}
+
+Status Customizable::GetOptionsMap(
+ const ConfigOptions&, const Customizable*, const std::string&,
+ std::string*, std::unordered_map*) {
+ return Status();
+}
+
+Status Customizable::ConfigureNewObject(
+ const ConfigOptions&, Customizable*,
+ const std::unordered_map&) {
+ return Status();
+}
+
+// --- Status methods (defined in util/status.cc) ---
+
+Status::Status(Code _code, SubCode _subcode, const Slice& msg,
+ const Slice& msg2, Severity sev)
+ : code_(_code), subcode_(_subcode), sev_(sev),
+ retryable_(false), data_loss_(false), scope_(0) {}
+
+std::unique_ptr Status::CopyState(const char* s) {
+ if (s == nullptr) return nullptr;
+ const size_t n = std::strlen(s) + 1;
+ char* result = new char[n];
+ std::memcpy(result, s, n);
+ return std::unique_ptr(result);
+}
+
+std::string Status::ToString() const {
+ return "OK";
+}
+
+} // namespace rocksdb
+
+#endif // _WIN32
+
+/* ------------------------------------------------------------------ */
+/* Our concrete compaction filter */
+/* ------------------------------------------------------------------ */
+
+class CqCompactionFilter : public rocksdb::CompactionFilter {
+public:
+ const char* Name() const override {
+ return "ConsumeQueueCompactionFilter";
+ }
+
+ bool Filter(int /*level*/, const rocksdb::Slice& /*key*/,
+ const rocksdb::Slice& existing_value, std::string* /*new_value*/,
+ bool* /*value_changed*/) const override {
+ static const int CQ_MIN_SIZE = 28;
+ if (existing_value.size() < static_cast(CQ_MIN_SIZE)) {
+ return false;
+ }
+ const unsigned char* data =
+ reinterpret_cast(existing_value.data());
+ int64_t phy_offset =
+ (static_cast(data[0]) << 56) |
+ (static_cast(data[1]) << 48) |
+ (static_cast(data[2]) << 40) |
+ (static_cast(data[3]) << 32) |
+ (static_cast(data[4]) << 24) |
+ (static_cast(data[5]) << 16) |
+ (static_cast(data[6]) << 8) |
+ (static_cast(data[7]));
+
+ int64_t min_offset = min_phy_offset_.load(std::memory_order_relaxed);
+ return phy_offset < min_offset;
+ }
+
+ void SetMinPhyOffset(int64_t offset) {
+ min_phy_offset_.store(offset, std::memory_order_relaxed);
+ }
+
+private:
+ std::atomic min_phy_offset_{0};
+};
+
+/* ------------------------------------------------------------------ */
+/* JNI bindings */
+/* ------------------------------------------------------------------ */
+
+#include
+
+extern "C" {
+
+JNIEXPORT jlong JNICALL
+Java_org_apache_rocketmq_store_rocksdb_CqCompactionFilterJni_createNativeFilter0(
+ JNIEnv* env, jclass clazz) {
+ CqCompactionFilter* filter = new CqCompactionFilter();
+ return reinterpret_cast(filter);
+}
+
+JNIEXPORT void JNICALL
+Java_org_apache_rocketmq_store_rocksdb_CqCompactionFilterJni_setMinPhyOffset0(
+ JNIEnv* env, jclass clazz, jlong filterPtr, jlong minPhyOffset) {
+ CqCompactionFilter* filter = reinterpret_cast(filterPtr);
+ filter->SetMinPhyOffset(minPhyOffset);
+}
+
+} // extern "C"
diff --git a/store/src/main/resources/native/cq_compaction_filter.dll b/store/src/main/resources/native/cq_compaction_filter.dll
new file mode 100755
index 00000000000..2dc74834f41
Binary files /dev/null and b/store/src/main/resources/native/cq_compaction_filter.dll differ
diff --git a/store/src/main/resources/native/libcq_compaction_filter.dylib b/store/src/main/resources/native/libcq_compaction_filter.dylib
new file mode 100755
index 00000000000..58d6e6796a7
Binary files /dev/null and b/store/src/main/resources/native/libcq_compaction_filter.dylib differ
diff --git a/store/src/main/resources/native/libcq_compaction_filter.so b/store/src/main/resources/native/libcq_compaction_filter.so
new file mode 100755
index 00000000000..46dabe1880a
Binary files /dev/null and b/store/src/main/resources/native/libcq_compaction_filter.so differ
diff --git a/store/src/main/resources/native/libcq_compaction_filter_aarch64.so b/store/src/main/resources/native/libcq_compaction_filter_aarch64.so
new file mode 100755
index 00000000000..b77869f063d
Binary files /dev/null and b/store/src/main/resources/native/libcq_compaction_filter_aarch64.so differ
diff --git a/store/src/test/java/org/apache/rocketmq/store/rocksdb/CqCompactionFilterJniTest.java b/store/src/test/java/org/apache/rocketmq/store/rocksdb/CqCompactionFilterJniTest.java
new file mode 100644
index 00000000000..eead66b0741
--- /dev/null
+++ b/store/src/test/java/org/apache/rocketmq/store/rocksdb/CqCompactionFilterJniTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.rocksdb;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.util.UUID;
+import org.apache.rocketmq.store.MessageStore;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.WriteBatch;
+
+public class CqCompactionFilterJniTest {
+
+ private static final int TOPIC_COUNT = 100;
+ private static final int BATCH_SIZE = 100_000;
+ private static final int MSG_SIZE = 1000;
+
+ private static final byte CTRL_1 = '\u0001';
+ private ConsumeQueueRocksDBStorage storage;
+
+ @Before
+ public void setUp() throws Exception {
+ Assume.assumeTrue("CqCompactionFilterJni native library must be loaded", CqCompactionFilterJni.isLoaded());
+ String dbPath = Files.createTempDirectory("rocksdb-cq-compaction-" + UUID.randomUUID()).toString();
+ MessageStore mockStore = Mockito.mock(MessageStore.class);
+ Mockito.when(mockStore.getMinPhyOffset()).thenReturn(0L);
+ Mockito.when(mockStore.getMessageStoreConfig()).thenReturn(new MessageStoreConfig());
+ storage = new ConsumeQueueRocksDBStorage(mockStore, dbPath);
+ }
+
+ @After
+ public void tearDown() {
+ if (storage != null) {
+ storage.shutdown();
+ storage.destroy();
+ }
+ }
+
+ @Test
+ public void testCreateAndSetFilter() {
+ Assert.assertTrue("Native library should be loaded", CqCompactionFilterJni.isLoaded());
+
+ long ptr = CqCompactionFilterJni.createNativeFilter0();
+ Assert.assertTrue("Native filter pointer should be non-zero", ptr != 0);
+
+ CqCompactionFilterJni.setMinPhyOffset0(ptr, 1000);
+ CqCompactionFilterJni.setMinPhyOffset0(ptr, Long.MAX_VALUE);
+
+ try (ColumnFamilyOptions options = new ColumnFamilyOptions()) {
+ CqCompactionFilterJni.setNativeFilter(options, ptr);
+ }
+ }
+
+ @Test
+ public void testCompactionFilter_small() throws Exception {
+ runCompactionTest(1_000_000);
+ }
+
+ @Test
+ public void testCompactionFilter_large() throws Exception {
+ runCompactionTest(10_000_000);
+ }
+
+ private void runCompactionTest(int totalEntries) throws Exception {
+ long start = System.currentTimeMillis();
+ boolean result = storage.start();
+ if (!result) {
+ System.err.println("storage.start() returned false. Check ERROR logs above for details.");
+ }
+ Assert.assertTrue("ConsumeQueueRocksDBStorage failed to start", result);
+ log("Startup took %d ms", System.currentTimeMillis() - start);
+
+ // Phase 1: Write entries
+ start = System.currentTimeMillis();
+ writeEntries(totalEntries);
+ long writeTime = System.currentTimeMillis() - start;
+ log("Wrote %d entries in %d ms (%.0f entries/sec)", totalEntries, writeTime, totalEntries * 1000.0 / writeTime);
+
+ // Phase 2: Count entries before compaction
+ start = System.currentTimeMillis();
+ long countBefore = storage.countEntries();
+ long countTime = System.currentTimeMillis() - start;
+ log("Count before compaction: %d (took %d ms)", countBefore, countTime);
+ Assert.assertEquals("Entry count should match total written", totalEntries, countBefore);
+
+ // Flush memtables to SST files so compaction has something to process
+ start = System.currentTimeMillis();
+ storage.flushAll();
+ log("Flush took %d ms", System.currentTimeMillis() - start);
+
+ // Phase 3: Set minPhyOffset at midpoint and trigger compaction
+ long minPhyOffset = (long) (totalEntries / 2.0) * MSG_SIZE;
+ start = System.currentTimeMillis();
+ storage.triggerCompactionSync(minPhyOffset);
+ long compactTime = System.currentTimeMillis() - start;
+ log("Compaction with minPhyOffset=%d took %d ms", minPhyOffset, compactTime);
+
+ // Phase 4: Count entries after compaction
+ start = System.currentTimeMillis();
+ long countAfter = storage.countEntries();
+ countTime = System.currentTimeMillis() - start;
+ log("Count after compaction: %d (took %d ms)", countAfter, countTime);
+
+ // Verify: approximately half the entries should remain
+ long expectedSurvivors = totalEntries - totalEntries / 2;
+ long tolerance = Math.max(expectedSurvivors / 100, 100);
+ Assert.assertTrue(
+ "Expected ~" + expectedSurvivors + " entries after compaction, but got " + countAfter,
+ countAfter >= expectedSurvivors - tolerance && countAfter <= expectedSurvivors + tolerance
+ );
+
+ log("Test passed: %d -> %d entries (expected ~%d)", totalEntries, countAfter, expectedSurvivors);
+ }
+
+ private void writeEntries(int totalEntries) throws Exception {
+ int entriesPerTopic = totalEntries / TOPIC_COUNT;
+
+ for (int t = 0; t < TOPIC_COUNT; t++) {
+ String topic = "test-topic-" + t;
+ byte[] topicBytes = topic.getBytes(StandardCharsets.UTF_8);
+ int queueId = 0;
+
+ try (WriteBatch batch = new WriteBatch()) {
+ for (int i = 0; i < entriesPerTopic; i++) {
+ int globalIndex = t * entriesPerTopic + i;
+
+ // Key: [topic_len:4][CTRL_1][topic][CTRL_1][queue_id:4][CTRL_1][cq_offset:8]
+ int keyLen = Integer.BYTES + 1 + topicBytes.length + 1 + Integer.BYTES + 1 + Long.BYTES;
+ ByteBuffer keyBB = ByteBuffer.allocate(keyLen);
+ keyBB.putInt(topicBytes.length)
+ .put(CTRL_1)
+ .put(topicBytes)
+ .put(CTRL_1)
+ .putInt(queueId)
+ .put(CTRL_1)
+ .putLong(i);
+
+ // Value: [phy_offset:8][msg_size:4][tags_code:8][store_timestamp:8] (28 bytes)
+ long phyOffset = (long) globalIndex * MSG_SIZE;
+ ByteBuffer valueBB = ByteBuffer.allocate(28);
+ valueBB.putLong(phyOffset)
+ .putInt(MSG_SIZE)
+ .putLong(0)
+ .putLong(System.currentTimeMillis());
+
+ batch.put(storage.getDefaultCFHandle(), keyBB.array(), valueBB.array());
+
+ if ((i + 1) % BATCH_SIZE == 0) {
+ storage.batchPut(batch);
+ }
+ }
+ if (entriesPerTopic % BATCH_SIZE != 0) {
+ storage.batchPut(batch);
+ }
+ }
+ }
+ }
+
+ private void log(String format, Object... args) {
+ System.out.printf("[CqCompactionFilterJniTest] " + format + "%n", args);
+ }
+}