From fc311ff276d55f89c7963a18ba88605530a587cc Mon Sep 17 00:00:00 2001 From: RongtongJin Date: Tue, 19 May 2026 14:08:00 +0800 Subject: [PATCH] Parameterize RocksDB CQ size amplification --- .../store/config/MessageStoreConfig.java | 10 +++++++ .../store/rocksdb/RocksDBOptionsFactory.java | 14 +++++----- .../rocksdb/RocksDBOptionsFactoryTest.java | 28 +++++++++++++++++++ 3 files changed, 45 insertions(+), 7 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java index 9f9f4ac3df5..fadc957e9d8 100644 --- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java +++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java @@ -505,6 +505,8 @@ public class MessageStoreConfig { private String rocksdbCompressionType = CompressionType.LZ4_COMPRESSION.getLibraryName(); + private int rocksdbMaxSizeAmplificationPercent = 25; + private long popRocksdbBlockCacheSize = 256 * SizeUnit.MB; private long popRocksdbWriteBufferSize = 32 * SizeUnit.MB; @@ -535,6 +537,14 @@ public void setRocksdbCompressionType(String compressionType) { this.rocksdbCompressionType = compressionType; } + public int getRocksdbMaxSizeAmplificationPercent() { + return rocksdbMaxSizeAmplificationPercent; + } + + public void setRocksdbMaxSizeAmplificationPercent(int rocksdbMaxSizeAmplificationPercent) { + this.rocksdbMaxSizeAmplificationPercent = rocksdbMaxSizeAmplificationPercent; + } + public long getPopRocksdbBlockCacheSize() { return popRocksdbBlockCacheSize; } diff --git a/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java b/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java index b74cf8c85d5..5a7476d665f 100644 --- a/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java +++ b/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java @@ -18,6 +18,7 @@ import org.apache.rocketmq.common.config.ConfigHelper; import org.apache.rocketmq.store.MessageStore; +import org.apache.rocketmq.store.config.MessageStoreConfig; import org.rocksdb.BlockBasedTableConfig; import org.rocksdb.BloomFilter; import org.rocksdb.ColumnFamilyOptions; @@ -58,19 +59,18 @@ public static ColumnFamilyOptions createCQCFOptions(final MessageStore messageSt setBlockCache(new LRUCache(1024 * SizeUnit.MB, 8, false)). setWholeKeyFiltering(true); + MessageStoreConfig messageStoreConfig = messageStore.getMessageStoreConfig(); ColumnFamilyOptions columnFamilyOptions = new ColumnFamilyOptions(); - CompactionOptionsUniversal compactionOption = new CompactionOptionsUniversal(); - compactionOption.setSizeRatio(100). - setMaxSizeAmplificationPercent(25). + CompactionOptionsUniversal compactionOption = new CompactionOptionsUniversal(). + setSizeRatio(100). + setMaxSizeAmplificationPercent(messageStoreConfig.getRocksdbMaxSizeAmplificationPercent()). setAllowTrivialMove(true). setMinMergeWidth(2). setMaxMergeWidth(Integer.MAX_VALUE). setStopStyle(CompactionStopStyle.CompactionStopStyleTotalSize). setCompressionSizePercent(-1); - String bottomMostCompressionTypeOpt = messageStore.getMessageStoreConfig() - .getBottomMostCompressionTypeForConsumeQueueStore(); - String compressionTypeOpt = messageStore.getMessageStoreConfig() - .getRocksdbCompressionType(); + String bottomMostCompressionTypeOpt = messageStoreConfig.getBottomMostCompressionTypeForConsumeQueueStore(); + String compressionTypeOpt = messageStoreConfig.getRocksdbCompressionType(); CompressionType bottomMostCompressionType = CompressionType.getCompressionType(bottomMostCompressionTypeOpt); CompressionType compressionType = CompressionType.getCompressionType(compressionTypeOpt); return columnFamilyOptions.setMaxWriteBufferNumber(4). diff --git a/store/src/test/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactoryTest.java b/store/src/test/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactoryTest.java index 1d7273968f6..ef285cd9998 100644 --- a/store/src/test/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactoryTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactoryTest.java @@ -17,13 +17,26 @@ package org.apache.rocketmq.store.rocksdb; +import org.apache.rocketmq.store.MessageStore; import org.apache.rocketmq.store.config.MessageStoreConfig; import org.junit.Assert; +import org.junit.BeforeClass; import org.junit.Test; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.CompactionOptionsUniversal; import org.rocksdb.CompressionType; +import org.rocksdb.RocksDB; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class RocksDBOptionsFactoryTest { + @BeforeClass + public static void loadRocksDB() { + RocksDB.loadLibrary(); + } + @Test public void testBottomMostCompressionType() { MessageStoreConfig config = new MessageStoreConfig(); @@ -31,4 +44,19 @@ public void testBottomMostCompressionType() { CompressionType.getCompressionType(config.getBottomMostCompressionTypeForConsumeQueueStore())); Assert.assertEquals(CompressionType.LZ4_COMPRESSION, CompressionType.getCompressionType("lz4")); } + + @Test + public void testConsumeQueueUniversalCompactionMaxSizeAmplificationPercent() { + MessageStoreConfig config = new MessageStoreConfig(); + config.setRocksdbMaxSizeAmplificationPercent(50); + MessageStore messageStore = mock(MessageStore.class); + when(messageStore.getMessageStoreConfig()).thenReturn(config); + + ConsumeQueueCompactionFilterFactory compactionFilterFactory = new ConsumeQueueCompactionFilterFactory(() -> 0); + try (ColumnFamilyOptions options = RocksDBOptionsFactory.createCQCFOptions(messageStore, compactionFilterFactory); + CompactionOptionsUniversal compactionOptions = options.compactionOptionsUniversal()) { + Assert.assertEquals(50, compactionOptions.maxSizeAmplificationPercent()); + } + } + }