From 8879f0e5f1bddb0407e6f4379c2bc2cd48e366bf Mon Sep 17 00:00:00 2001 From: Jerry-Tekh Date: Mon, 29 Jun 2026 16:51:40 +0100 Subject: [PATCH] fix: reload shard router ring at runtime --- src/sharding/router/shard-router.service.ts | 274 ++++++++++++++------ src/sharding/shard-config.service.ts | 130 +++++++++- src/sharding/shard-router.spec.ts | 153 ++++++++++- src/sharding/sharding.controller.ts | 9 + 4 files changed, 480 insertions(+), 86 deletions(-) diff --git a/src/sharding/router/shard-router.service.ts b/src/sharding/router/shard-router.service.ts index 88772b9c..e913ad9c 100644 --- a/src/sharding/router/shard-router.service.ts +++ b/src/sharding/router/shard-router.service.ts @@ -1,4 +1,4 @@ -import { Injectable, Logger } from '@nestjs/common'; +import { Injectable, Logger, OnModuleDestroy } from '@nestjs/common'; import { createHash } from 'crypto'; import { ShardConfig, @@ -8,6 +8,67 @@ import { } from '../interfaces/shard.interface'; import { ShardConfigService } from '../shard-config.service'; +type RangeBucket = { min: number; max: number; shardId: string }; + +interface RoutingSnapshot { + ring: ShardNode[]; + rangeBuckets: RangeBucket[]; + shardsById: Map; +} + +class AsyncReadWriteLock { + private activeReaders = 0; + private activeWriter = false; + private waitingWriters: Array<() => void> = []; + + async write(operation: () => T | Promise): Promise { + await this.acquireWrite(); + try { + return await operation(); + } finally { + this.releaseWrite(); + } + } + + read(operation: () => T): T { + this.activeReaders++; + try { + return operation(); + } finally { + this.releaseRead(); + } + } + + private async acquireWrite(): Promise { + if (!this.activeWriter && this.activeReaders === 0) { + this.activeWriter = true; + return; + } + + await new Promise((resolve) => this.waitingWriters.push(resolve)); + this.activeWriter = true; + } + + private releaseRead(): void { + this.activeReaders--; + this.drainWriters(); + } + + private releaseWrite(): void { + this.activeWriter = false; + this.drainWriters(); + } + + private drainWriters(): void { + if (this.activeWriter || this.activeReaders > 0 || this.waitingWriters.length === 0) { + return; + } + + const nextWriter = this.waitingWriters.shift(); + nextWriter?.(); + } +} + /** * ShardRouter * @@ -22,21 +83,31 @@ import { ShardConfigService } from '../shard-config.service'; * ensure an even key distribution even with a small shard count. */ @Injectable() -export class ShardRouter { +export class ShardRouter implements OnModuleDestroy { private readonly logger = new Logger(ShardRouter.name); private readonly VIRTUAL_NODES_PER_SHARD = 150; private readonly MAX_UINT32 = 0xffffffff; + private readonly routingLock = new AsyncReadWriteLock(); + private unsubscribeConfigUpdates?: () => void; - /** Sorted list of virtual-node → shard mappings for consistent hashing */ - private ring: ShardNode[] = []; - - /** Range buckets: [min, max) → shardId */ - private rangeBuckets: Array<{ min: number; max: number; shardId: string }> = []; + /** Current immutable routing view used by route() calls */ + private routingSnapshot: RoutingSnapshot = { + ring: [], + rangeBuckets: [], + shardsById: new Map(), + }; constructor(private readonly shardConfigService: ShardConfigService) { + this.unsubscribeConfigUpdates = this.shardConfigService.onConfigUpdated(() => + this.reloadConfig(), + ); this.rebuildRing(); } + onModuleDestroy(): void { + this.unsubscribeConfigUpdates?.(); + } + // --------------------------------------------------------------------------- // Public routing API // --------------------------------------------------------------------------- @@ -54,48 +125,51 @@ export class ShardRouter { ): ShardRoutingResult { const start = Date.now(); - let shard: ShardConfig; - - switch (strategy) { - case ShardStrategy.TENANT_BASED: - shard = this.routeByTenant(key); - break; - case ShardStrategy.RANGE_BASED: - shard = this.routeByRange(key); - break; - case ShardStrategy.HASH_BASED: - default: - shard = this.routeByHash(key); - break; - } + return this.routingLock.read(() => { + const snapshot = this.routingSnapshot; + let shard: ShardConfig; + + switch (strategy) { + case ShardStrategy.TENANT_BASED: + shard = this.routeByTenant(snapshot, key); + break; + case ShardStrategy.RANGE_BASED: + shard = this.routeByRange(snapshot, key); + break; + case ShardStrategy.HASH_BASED: + default: + shard = this.routeByHash(snapshot, key); + break; + } - const isReplica = false; - if (forRead && shard.readReplicas?.length) { - // Pick a replica using weighted random selection - const replica = this.pickWeightedReplica(shard); - if (replica) { - // Return a synthetic ShardConfig representing the replica - const replicaShard: ShardConfig = { - ...shard, - id: replica.id, - host: replica.host, - port: replica.port, - }; - return { - shard: replicaShard, - isReplica: true, - routingKey: key, - resolutionTimeMs: Date.now() - start, - }; + const isReplica = false; + if (forRead && shard.readReplicas?.length) { + // Pick a replica using weighted random selection + const replica = this.pickWeightedReplica(shard); + if (replica) { + // Return a synthetic ShardConfig representing the replica + const replicaShard: ShardConfig = { + ...shard, + id: replica.id, + host: replica.host, + port: replica.port, + }; + return { + shard: replicaShard, + isReplica: true, + routingKey: key, + resolutionTimeMs: Date.now() - start, + }; + } } - } - return { - shard, - isReplica, - routingKey: key, - resolutionTimeMs: Date.now() - start, - }; + return { + shard, + isReplica, + routingKey: key, + resolutionTimeMs: Date.now() - start, + }; + }); } /** @@ -104,15 +178,63 @@ export class ShardRouter { */ rebuildRing(): void { const activeShards = this.shardConfigService.getActiveShards(); - if (activeShards.length === 0) { + const nextSnapshot = this.buildRoutingSnapshot(activeShards, this.routingSnapshot.rangeBuckets); + this.routingSnapshot = nextSnapshot; + + if (nextSnapshot.ring.length === 0) { this.logger.warn('No active shards available — consistent-hash ring is empty'); - this.ring = []; return; } + this.logger.log( + `Consistent-hash ring rebuilt with ${nextSnapshot.ring.length} virtual nodes ` + + `across ${activeShards.length} active shard(s)`, + ); + } + + /** + * Reload shard configuration and atomically publish a new routing snapshot. + */ + async reloadConfig(): Promise { + await this.routingLock.write(async () => { + this.shardConfigService.reloadConfig(); + const activeShards = this.shardConfigService.getActiveShards(); + const nextSnapshot = this.buildRoutingSnapshot( + activeShards, + this.routingSnapshot.rangeBuckets, + ); + this.routingSnapshot = nextSnapshot; + + if (nextSnapshot.ring.length === 0) { + this.logger.warn('Shard config reload produced an empty consistent-hash ring'); + return; + } + + this.logger.log( + `Shard config reloaded; ring now has ${nextSnapshot.ring.length} virtual nodes ` + + `across ${activeShards.length} active shard(s)`, + ); + }); + } + + private buildRoutingSnapshot( + activeShards: ShardConfig[], + rangeBuckets: RangeBucket[], + ): RoutingSnapshot { + if (activeShards.length === 0) { + return { + ring: [], + rangeBuckets: [...rangeBuckets], + shardsById: new Map(), + }; + } + const nodes: ShardNode[] = []; + const shardsById = new Map(); for (const shard of activeShards) { + shardsById.set(shard.id, shard); + // Scale virtual-node count by weight (100 = default) const vnodeCount = Math.round((this.VIRTUAL_NODES_PER_SHARD * shard.weight) / 100); @@ -124,72 +246,76 @@ export class ShardRouter { // Sort ascending by virtual-node position nodes.sort((a, b) => a.virtualNode - b.virtualNode); - this.ring = nodes; - this.logger.log( - `Consistent-hash ring rebuilt with ${this.ring.length} virtual nodes ` + - `across ${activeShards.length} active shard(s)`, - ); + return { + ring: nodes, + rangeBuckets: [...rangeBuckets], + shardsById, + }; } /** * Configure range buckets for RANGE_BASED routing. * @param buckets Ordered, non-overlapping range definitions */ - setRangeBuckets(buckets: Array<{ min: number; max: number; shardId: string }>): void { - this.rangeBuckets = [...buckets].sort((a, b) => a.min - b.min); - this.logger.log(`Range buckets configured: ${JSON.stringify(this.rangeBuckets)}`); + setRangeBuckets(buckets: RangeBucket[]): void { + const rangeBuckets = [...buckets].sort((a, b) => a.min - b.min); + this.routingSnapshot = { + ...this.routingSnapshot, + rangeBuckets, + }; + this.logger.log(`Range buckets configured: ${JSON.stringify(rangeBuckets)}`); } // --------------------------------------------------------------------------- // Strategy implementations // --------------------------------------------------------------------------- - private routeByHash(key: string): ShardConfig { - if (this.ring.length === 0) { + private routeByHash(snapshot: RoutingSnapshot, key: string): ShardConfig { + if (snapshot.ring.length === 0) { throw new Error('ShardRouter: consistent-hash ring is empty — no active shards'); } const keyHash = this.hash32(key); - const idx = this.findRingPosition(keyHash); - const shardId = this.ring[idx].shardId; + const idx = this.findRingPosition(snapshot.ring, keyHash); + const shardId = snapshot.ring[idx].shardId; - const shard = this.shardConfigService.getShardById(shardId); + const shard = snapshot.shardsById.get(shardId); if (!shard) { throw new Error(`ShardRouter: shard "${shardId}" not found in configuration`); } return shard; } - private routeByTenant(tenantKey: string): ShardConfig { + private routeByTenant(snapshot: RoutingSnapshot, tenantKey: string): ShardConfig { // Tenant keys are expected in the form "tenant::" or just a tenantId. // We normalise by stripping the prefix and hashing the tenant segment only // so that all data for a given tenant always lands on the same shard. const tenantId = tenantKey.replace(/^tenant:/, '').split(':')[0]; - return this.routeByHash(`tenant:${tenantId}`); + return this.routeByHash(snapshot, `tenant:${tenantId}`); } - private routeByRange(key: string): ShardConfig { + private routeByRange(snapshot: RoutingSnapshot, key: string): ShardConfig { const numeric = parseInt(key, 10); if (isNaN(numeric)) { this.logger.warn(`RANGE_BASED routing: non-numeric key "${key}" — falling back to hash`); - return this.routeByHash(key); + return this.routeByHash(snapshot, key); } - if (this.rangeBuckets.length === 0) { + if (snapshot.rangeBuckets.length === 0) { this.logger.warn('RANGE_BASED routing: no range buckets configured — falling back to hash'); - return this.routeByHash(key); + return this.routeByHash(snapshot, key); } - const bucket = this.rangeBuckets.find((b) => numeric >= b.min && numeric < b.max); + const bucket = snapshot.rangeBuckets.find((b) => numeric >= b.min && numeric < b.max); if (!bucket) { this.logger.warn( `RANGE_BASED routing: key ${numeric} falls outside all buckets — falling back to hash`, ); - return this.routeByHash(key); + return this.routeByHash(snapshot, key); } - const shard = this.shardConfigService.getShardById(bucket.shardId); + const shard = snapshot.shardsById.get(bucket.shardId); if (!shard) { throw new Error(`ShardRouter: range bucket points to unknown shard "${bucket.shardId}"`); } @@ -204,20 +330,20 @@ export class ShardRouter { * Binary search for the first virtual-node at or after `hash`. * Wraps around to index 0 when hash exceeds the last virtual-node. */ - private findRingPosition(hash: number): number { + private findRingPosition(ring: ShardNode[], hash: number): number { let lo = 0; - let hi = this.ring.length - 1; + let hi = ring.length - 1; while (lo <= hi) { const mid = (lo + hi) >>> 1; - if (this.ring[mid].virtualNode < hash) { + if (ring[mid].virtualNode < hash) { lo = mid + 1; } else { hi = mid - 1; } } - return lo % this.ring.length; // wrap around + return lo % ring.length; // wrap around } /** Deterministic 32-bit FNV-1a-style hash via Node's crypto module */ diff --git a/src/sharding/shard-config.service.ts b/src/sharding/shard-config.service.ts index 41d928e3..5e54e783 100644 --- a/src/sharding/shard-config.service.ts +++ b/src/sharding/shard-config.service.ts @@ -1,7 +1,13 @@ -import { Injectable, Logger } from '@nestjs/common'; +import { Injectable, Logger, OnModuleDestroy, OnModuleInit } from '@nestjs/common'; import { ConfigService } from '@nestjs/config'; +import Redis from 'ioredis'; +import { getSharedRedisClient } from '../config/cache.config'; import { ShardConfig, ShardStatus, ReadReplicaConfig } from './interfaces/shard.interface'; +export const SHARD_CONFIG_UPDATED_CHANNEL = 'shard:config:updated'; + +type ShardConfigUpdateListener = (message?: string) => void | Promise; + /** * ShardConfigService * @@ -29,14 +35,27 @@ import { ShardConfig, ShardStatus, ReadReplicaConfig } from './interfaces/shard. * development environments require zero additional configuration. */ @Injectable() -export class ShardConfigService { +export class ShardConfigService implements OnModuleInit, OnModuleDestroy { private readonly logger = new Logger(ShardConfigService.name); + private readonly reloadDeadlineMs = 5000; private shards: Map = new Map(); + private configUpdateSubscriber?: Redis; + private readonly configUpdateListeners = new Set(); constructor(private readonly configService: ConfigService) { this.loadShardConfiguration(); } + async onModuleInit(): Promise { + await this.subscribeToConfigUpdates(); + } + + async onModuleDestroy(): Promise { + if (this.configUpdateSubscriber && this.configUpdateSubscriber.status !== 'end') { + await this.configUpdateSubscriber.quit(); + } + } + // --------------------------------------------------------------------------- // Public API // --------------------------------------------------------------------------- @@ -58,6 +77,28 @@ export class ShardConfigService { return this.shards.get(id); } + /** + * Reload shard topology from the current configuration source. + * + * The map is rebuilt off to the side, then swapped in one assignment so + * readers never observe a partially populated configuration. + */ + reloadConfig(): ShardConfig[] { + const nextShards = this.buildShardConfiguration(); + this.shards = nextShards; + this.logger.log(`Reloaded ${this.shards.size} shard(s) from configuration`); + return this.getAllShards(); + } + + /** + * Register a listener for runtime shard config update events. + * Returns an unsubscribe function for module teardown/tests. + */ + onConfigUpdated(listener: ShardConfigUpdateListener): () => void { + this.configUpdateListeners.add(listener); + return () => this.configUpdateListeners.delete(listener); + } + /** Update a shard's status at runtime (e.g. during draining) */ updateShardStatus(id: string, status: ShardStatus): void { const shard = this.shards.get(id); @@ -74,19 +115,26 @@ export class ShardConfigService { // --------------------------------------------------------------------------- private loadShardConfiguration(): void { + this.shards = this.buildShardConfiguration(); + this.logger.log(`Loaded ${this.shards.size} shard(s) from environment configuration`); + } + + private buildShardConfiguration(): Map { + const shards = new Map(); const shardCount = parseInt(this.configService.get('SHARD_COUNT', '0'), 10); if (shardCount === 0) { - this.loadFallbackSingleShard(); - return; + const fallbackShard = this.buildFallbackSingleShard(); + shards.set(fallbackShard.id, fallbackShard); + return shards; } for (let i = 0; i < shardCount; i++) { const shard = this.buildShardFromEnv(i); - this.shards.set(shard.id, shard); + shards.set(shard.id, shard); } - this.logger.log(`Loaded ${this.shards.size} shard(s) from environment configuration`); + return shards; } private buildShardFromEnv(index: number): ShardConfig { @@ -132,8 +180,8 @@ export class ShardConfigService { } /** Fall back to the legacy DATABASE_* variables as a single shard */ - private loadFallbackSingleShard(): void { - const shard: ShardConfig = { + private buildFallbackSingleShard(): ShardConfig { + return { id: 'shard-00', name: 'Default Shard', host: this.configService.get('DATABASE_HOST', 'localhost'), @@ -146,8 +194,70 @@ export class ShardConfigService { weight: 100, status: ShardStatus.ACTIVE, }; + } - this.shards.set(shard.id, shard); - this.logger.log('SHARD_COUNT not set — running in single-shard (fallback) mode'); + private async subscribeToConfigUpdates(): Promise { + if (!this.isConfigReloadSubscriptionEnabled()) { + this.logger.log('Shard config Redis reload subscription disabled'); + return; + } + + try { + this.configUpdateSubscriber = getSharedRedisClient(this.configService).duplicate(); + this.configUpdateSubscriber.on('error', (error) => { + this.logger.warn(`Shard config Redis subscriber error: ${(error as Error).message}`); + }); + this.configUpdateSubscriber.on('message', (channel, message) => { + if (channel === SHARD_CONFIG_UPDATED_CHANNEL) { + void this.notifyConfigUpdated(message); + } + }); + + await this.configUpdateSubscriber.subscribe(SHARD_CONFIG_UPDATED_CHANNEL); + this.logger.log(`Subscribed to Redis channel "${SHARD_CONFIG_UPDATED_CHANNEL}"`); + } catch (error) { + this.logger.warn( + `Unable to subscribe to "${SHARD_CONFIG_UPDATED_CHANNEL}": ${(error as Error).message}`, + ); + } + } + + private isConfigReloadSubscriptionEnabled(): boolean { + const configured = this.configService.get('SHARD_CONFIG_RELOAD_SUBSCRIBE_ENABLED'); + if (configured !== undefined) { + return configured.toLowerCase() !== 'false'; + } + + return process.env.NODE_ENV !== 'test'; + } + + private async notifyConfigUpdated(message?: string): Promise { + if (this.configUpdateListeners.size === 0) { + this.logger.warn( + `Received "${SHARD_CONFIG_UPDATED_CHANNEL}" but no shard config listeners are registered`, + ); + return; + } + + this.logger.log(`Received "${SHARD_CONFIG_UPDATED_CHANNEL}" event; reloading shard topology`); + + const timeout = setTimeout(() => { + this.logger.warn( + `Shard config reload listeners are still running after ${this.reloadDeadlineMs}ms`, + ); + }, this.reloadDeadlineMs); + + try { + await Promise.all( + Array.from(this.configUpdateListeners).map(async (listener) => listener(message)), + ); + } catch (error) { + this.logger.error( + `Shard config reload listener failed: ${(error as Error).message}`, + (error as Error).stack, + ); + } finally { + clearTimeout(timeout); + } } } diff --git a/src/sharding/shard-router.spec.ts b/src/sharding/shard-router.spec.ts index c4010efd..2c5b0e28 100644 --- a/src/sharding/shard-router.spec.ts +++ b/src/sharding/shard-router.spec.ts @@ -48,12 +48,23 @@ const mockShards: ShardConfig[] = [ const mockShardConfigService = { getActiveShards: jest.fn(() => mockShards), getShardById: jest.fn((id: string) => mockShards.find((s) => s.id === id)), + reloadConfig: jest.fn(() => mockShards), + onConfigUpdated: jest.fn( + (_listener: (message?: string) => void | Promise) => jest.fn(), + ), }; describe('ShardRouter', () => { let router: ShardRouter; beforeEach(async () => { + mockShardConfigService.getActiveShards.mockReturnValue(mockShards); + mockShardConfigService.getShardById.mockImplementation((id: string) => + mockShards.find((s) => s.id === id), + ); + mockShardConfigService.reloadConfig.mockReturnValue(mockShards); + mockShardConfigService.onConfigUpdated.mockReturnValue(jest.fn()); + const module: TestingModule = await Test.createTestingModule({ providers: [ShardRouter, { provide: ShardConfigService, useValue: mockShardConfigService }], }).compile(); @@ -71,14 +82,56 @@ describe('ShardRouter', () => { }); it('produces a larger ring for higher-weight shards', () => { - // White-box: access private ring via cast + // White-box: access private routing snapshot via cast router.rebuildRing(); - const ring = (router as unknown as { ring: { shardId: string }[] }).ring; + const ring = (router as unknown as { routingSnapshot: { ring: { shardId: string }[] } }) + .routingSnapshot.ring; const shard00Count = ring.filter((n) => n.shardId === 'shard-00').length; const shard02Count = ring.filter((n) => n.shardId === 'shard-02').length; // shard-02 has weight=50, shard-00 has weight=100 → shard-00 should have ~2x nodes expect(shard00Count).toBeGreaterThan(shard02Count); }); + + it('registers for shard config update events', () => { + expect(mockShardConfigService.onConfigUpdated).toHaveBeenCalledTimes(1); + }); + + it('reloads when the shard config update listener fires', async () => { + const expandedShards: ShardConfig[] = [ + ...mockShards, + { + id: 'shard-03', + name: 'Shard 3', + host: 'pg-3.internal', + port: 5432, + username: 'user', + password: 'pass', + database: 'teachlink_3', + poolMax: 30, + poolMin: 5, + weight: 100, + status: ShardStatus.ACTIVE, + }, + ]; + + mockShardConfigService.reloadConfig.mockReturnValue(expandedShards); + mockShardConfigService.getActiveShards.mockReturnValue(expandedShards); + mockShardConfigService.getShardById.mockImplementation((id: string) => + expandedShards.find((s) => s.id === id), + ); + + const updateListener = mockShardConfigService.onConfigUpdated.mock.calls[0][0] as ( + message?: string, + ) => Promise; + await updateListener('test-config-version'); + + const shardIds = new Set(); + for (let i = 0; i < 1000; i++) { + shardIds.add(router.route(`pubsub-reload-user-${i}`).shard.id); + } + + expect(shardIds).toContain('shard-03'); + }); }); // ── Hash-based routing ─────────────────────────────────────────────────── @@ -191,4 +244,100 @@ describe('ShardRouter', () => { expect(() => router.route('some-key')).toThrow('consistent-hash ring is empty'); }); }); + + describe('live reload', () => { + it('reloads shard config and routes using the new ring', async () => { + const expandedShards: ShardConfig[] = [ + ...mockShards, + { + id: 'shard-03', + name: 'Shard 3', + host: 'pg-3.internal', + port: 5432, + username: 'user', + password: 'pass', + database: 'teachlink_3', + poolMax: 30, + poolMin: 5, + weight: 100, + status: ShardStatus.ACTIVE, + }, + ]; + + mockShardConfigService.reloadConfig.mockReturnValue(expandedShards); + mockShardConfigService.getActiveShards.mockReturnValue(expandedShards); + mockShardConfigService.getShardById.mockImplementation((id: string) => + expandedShards.find((s) => s.id === id), + ); + + await router.reloadConfig(); + + const shardIds = new Set(); + for (let i = 0; i < 1000; i++) { + shardIds.add(router.route(`reloaded-user-${i}`).shard.id); + } + + expect(mockShardConfigService.reloadConfig).toHaveBeenCalled(); + expect(shardIds).toContain('shard-03'); + }); + + it('does not route against a partially rebuilt ring during reload', async () => { + let activeShards = mockShards; + const expandedShards: ShardConfig[] = [ + ...mockShards, + { + id: 'shard-03', + name: 'Shard 3', + host: 'pg-3.internal', + port: 5432, + username: 'user', + password: 'pass', + database: 'teachlink_3', + poolMax: 30, + poolMin: 5, + weight: 100, + status: ShardStatus.ACTIVE, + }, + ]; + + mockShardConfigService.getActiveShards.mockImplementation(() => activeShards); + mockShardConfigService.getShardById.mockImplementation((id: string) => + activeShards.find((s) => s.id === id), + ); + mockShardConfigService.reloadConfig.mockImplementation(() => { + activeShards = expandedShards; + return activeShards; + }); + + const observedShardIds = new Set(); + const routeErrors: Error[] = []; + let keepRouting = true; + + const routingLoop = async () => { + let i = 0; + while (keepRouting) { + try { + observedShardIds.add(router.route(`live-reload-key-${i++}`).shard.id); + } catch (error) { + routeErrors.push(error as Error); + } + await Promise.resolve(); + } + }; + + const routingPromise = routingLoop(); + await Promise.resolve(); + await router.reloadConfig(); + + for (let i = 0; i < 1000; i++) { + observedShardIds.add(router.route(`post-reload-key-${i}`).shard.id); + } + + keepRouting = false; + await routingPromise; + + expect(routeErrors).toEqual([]); + expect(observedShardIds).toContain('shard-03'); + }); + }); }); diff --git a/src/sharding/sharding.controller.ts b/src/sharding/sharding.controller.ts index 747c8f53..cd01b293 100644 --- a/src/sharding/sharding.controller.ts +++ b/src/sharding/sharding.controller.ts @@ -63,6 +63,7 @@ class AutoRebalanceDto { * POST /sharding/rebalance — manual rebalance * POST /sharding/rebalance/auto — auto rebalance analysis * GET /sharding/rebalance/plans — list rebalance plans + * POST /sharding/reload — reload shard config and hash ring * POST /sharding/ring/rebuild — rebuild consistent-hash ring */ @ApiTags('sharding') @@ -182,6 +183,14 @@ export class ShardingController { // ── Hash Ring ───────────────────────────────────────────────────────────── + @Post('reload') + @HttpCode(HttpStatus.OK) + @ApiOperation({ summary: 'Reload shard configuration and rebuild the consistent-hash ring' }) + async reloadConfig() { + await this.shardRouter.reloadConfig(); + return { message: 'Shard configuration reloaded successfully' }; + } + @Post('ring/rebuild') @HttpCode(HttpStatus.OK) @ApiOperation({ summary: 'Force a rebuild of the consistent-hash ring' })