Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/connection_string.ts
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,10 @@ interface OptionDescriptor {
}

export const OPTIONS = {
adaptiveRetries: {
default: false,
type: 'boolean'
},
appName: {
type: 'string'
},
Expand Down
3 changes: 3 additions & 0 deletions src/mongo_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,8 @@ export interface MongoClientOptions extends BSONSerializeOptions, SupportedNodeC
retryReads?: boolean;
/** Enable retryable writes. */
retryWrites?: boolean;
/** Whether to enable adaptive retry rate limiting using a token bucket. Defaults to false. */
adaptiveRetries?: boolean;
/** Allow a driver to force a Single topology type with a connection string containing one host */
directConnection?: boolean;
/** Instruct the driver it is connecting to a load balancer fronting a mongos like service */
Expand Down Expand Up @@ -1041,6 +1043,7 @@ export interface MongoOptions
extends Required<
Pick<
MongoClientOptions,
| 'adaptiveRetries'
| 'autoEncryption'
| 'connectTimeoutMS'
| 'directConnection'
Expand Down
26 changes: 16 additions & 10 deletions src/operations/execute_operation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -266,11 +266,13 @@ async function executeOperationWithRetries<
try {
try {
const result = await server.command(operation, timeoutContext);
topology.tokenBucket.deposit(
attempt > 0
? RETRY_TOKEN_RETURN_RATE + RETRY_COST // on successful retry
: RETRY_TOKEN_RETURN_RATE // otherwise
);
if (topology.s.options.adaptiveRetries) {
topology.tokenBucket.deposit(
attempt > 0
? RETRY_TOKEN_RETURN_RATE + RETRY_COST // on successful retry
: RETRY_TOKEN_RETURN_RATE // otherwise
);
}
return operation.handleOk(result);
} catch (error) {
return operation.handleError(error);
Expand All @@ -279,7 +281,11 @@ async function executeOperationWithRetries<
// Should never happen but if it does - propagate the error.
if (!(operationError instanceof MongoError)) throw operationError;

if (attempt > 0 && !operationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError)) {
if (
topology.s.options.adaptiveRetries &&
attempt > 0 &&
!operationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError)
) {
// if a retry attempt fails with a non-overload error, deposit 1 token.
topology.tokenBucket.deposit(RETRY_COST);
}
Expand Down Expand Up @@ -318,17 +324,17 @@ async function executeOperationWithRetries<
}

if (operationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError)) {
if (!topology.tokenBucket.consume(RETRY_COST)) {
throw error;
}

const backoffMS = Math.random() * Math.min(MAX_BACKOFF_MS, BASE_BACKOFF_MS * 2 ** attempt);

// if the backoff would exhaust the CSOT timeout, short-circuit.
if (timeoutContext.csotEnabled() && backoffMS > timeoutContext.remainingTimeMS) {
throw error;
}

if (topology.s.options.adaptiveRetries && !topology.tokenBucket.consume(RETRY_COST)) {
throw error;
}

await setTimeout(backoffMS);
}

Expand Down
1 change: 1 addition & 0 deletions src/sdam/topology.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ export interface TopologyOptions extends BSONSerializeOptions, ServerOptions {
hosts: HostAddress[];
retryWrites: boolean;
retryReads: boolean;
adaptiveRetries: boolean;
/** How long to block for server selection before throwing an error */
serverSelectionTimeoutMS: number;
/** The name of the replica set to connect to */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,21 @@ import { expect } from 'chai';
import * as sinon from 'sinon';

import {
type Collection,
INITIAL_TOKEN_BUCKET_SIZE,
MAX_RETRIES,
type MongoClient,
MongoServerError
} from '../../mongodb';
import { clearFailPoint, configureFailPoint, measureDuration } from '../../tools/utils';
import { filterForCommands } from '../shared';

describe('Client Backpressure (Prose)', function () {
let client: MongoClient;
let collection: Collection;

beforeEach(async function () {
client = this.configuration.newClient();
await client.connect();

collection = client.db('foo').collection('bar');
});

afterEach(async function () {
sinon.restore();
await client.close();
client = undefined;
await clearFailPoint(this.configuration);
});

Expand All @@ -34,6 +28,30 @@ describe('Client Backpressure (Prose)', function () {
}
},
async function () {
// 1. Let `client` be a `MongoClient`
client = this.configuration.newClient();
await client.connect();

// 2. Let `collection` be a collection
const collection = client.db('foo').collection('bar');

// 3. Now, run transactions without backoff:
// i. Configure the random number generator used for jitter to always return `0` -- this effectively disables backoff.
const stub = sinon.stub(Math, 'random');
stub.returns(0);

// ii. Configure the following failPoint:
// ```javascript
// {
// configureFailPoint: 'failCommand',
// mode: 'alwaysOn',
// data: {
// failCommands: ['insert'],
// errorCode: 2,
// errorLabels: ['SystemOverloadedError', 'RetryableError']
// }
// }
// ```
await configureFailPoint(this.configuration, {
configureFailPoint: 'failCommand',
mode: 'alwaysOn',
Expand All @@ -44,38 +62,156 @@ describe('Client Backpressure (Prose)', function () {
}
});

const stub = sinon.stub(Math, 'random');

stub.returns(0);

// iii. Insert the document `{ a: 1 }`. Expect that the command errors. Measure the duration of the command execution.
const { duration: durationNoBackoff } = await measureDuration(async () => {
const error = await collection.insertOne({ a: 1 }).catch(e => e);
expect(error).to.be.instanceof(MongoServerError);
});

// iv. Configure the random number generator used for jitter to always return a number as close as possible to `1`.
stub.returns(0.99);

// v. Execute step iii again.
const { duration: durationBackoff } = await measureDuration(async () => {
const error = await collection.insertOne({ a: 1 }).catch(e => e);
expect(error).to.be.instanceof(MongoServerError);
});

// vi. Compare the two time between the two runs.
// The sum of 5 backoffs is 3.1 seconds. There is a 1-second window to account for potential variance between the two runs.
expect(durationBackoff - durationNoBackoff).to.be.within(3100 - 1000, 3100 + 1000);
}
);

it('Test 2: Token Bucket capacity is Enforced', async () => {
// 1-2. Assert that the client's retry token bucket is at full capacity and that the capacity
// is DEFAULT_RETRY_TOKEN_CAPACITY.
it('Test 2: Token Bucket capacity is Enforced', async function () {
// 1. Let client be a MongoClient with adaptiveRetries=True.
client = this.configuration.newClient({
adaptiveRetries: true
});
await client.connect();

// 2. Assert that the client's retry token bucket is at full capacity and that the capacity is DEFAULT_RETRY_TOKEN_CAPACITY.
const tokenBucket = client.topology.tokenBucket;
expect(tokenBucket).to.have.property('budget', INITIAL_TOKEN_BUCKET_SIZE);
expect(tokenBucket).to.have.property('capacity', INITIAL_TOKEN_BUCKET_SIZE);

// 3. Execute a successful ping command.
// 3. Using client, execute a successful ping command.
await client.db('admin').command({ ping: 1 });

// 4. Assert that the successful command did not increase the number of tokens in the bucket
// above DEFAULT_RETRY_TOKEN_CAPACITY.
// 4. Assert that the successful command did not increase the number of tokens in the bucket above DEFAULT_RETRY_TOKEN_CAPACITY.
expect(tokenBucket).to.have.property('budget').that.is.at.most(INITIAL_TOKEN_BUCKET_SIZE);
});

it(
'Test 3: Overload Errors are Retried a Maximum of MAX_RETRIES times',
{
requires: {
mongodb: '>=4.4'
}
},
async function () {
// 1. Let `client` be a `MongoClient` with command event monitoring enabled.
client = this.configuration.newClient({
monitorCommands: true
});
await client.connect();

// 2. Let `coll` be a collection.
const collection = client.db('foo').collection('bar');
const commandsStarted = [];
client.on('commandStarted', filterForCommands(['find'], commandsStarted));

/*
* 3. Configure the following failpoint:
{
configureFailPoint: 'failCommand',
mode: 'alwaysOn',
data: {
failCommands: ['find'],
errorCode: 462, // IngressRequestRateLimitExceeded
errorLabels: ['SystemOverloadedError', 'RetryableError']
}
}
* */
await configureFailPoint(this.configuration, {
configureFailPoint: 'failCommand',
mode: 'alwaysOn',
data: {
failCommands: ['find'],
errorCode: 462,
errorLabels: ['RetryableError', 'SystemOverloadedError']
}
});

// 4. Perform a find operation with `coll` that fails.
const error = await collection.findOne({}).catch(e => e);

// 5. Assert that the raised error contains both the `RetryableError` and `SystemOverloadedError` error labels.
expect(error).to.be.instanceof(MongoServerError);
expect(error.hasErrorLabel('RetryableError')).to.be.true;
expect(error.hasErrorLabel('SystemOverloadedError')).to.be.true;

// 6. Assert that the total number of started commands is MAX_RETRIES + 1 (6).
expect(commandsStarted).to.have.length(MAX_RETRIES + 1);
}
);

it(
'Test 4: Adaptive Retries are Limited by Token Bucket Tokens',
{
requires: {
mongodb: '>=4.4'
}
},
async function () {
// 1. Let `client` be a `MongoClient` with `adaptiveRetries=True` and command event monitoring enabled.
client = this.configuration.newClient({
adaptiveRetries: true,
monitorCommands: true
});
await client.connect();

// 2. Set `client`'s retry token bucket to have 2 tokens.
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
client.topology!.tokenBucket['budget'] = 2;

// 3. Let `coll` be a collection.
const collection = client.db('foo').collection('bar');
const commandsStarted = [];
client.on('commandStarted', filterForCommands(['find'], commandsStarted));

/*
* 4. Configure the following failpoint:
{
configureFailPoint: 'failCommand',
mode: {times: 3},
data: {
failCommands: ['find'],
errorCode: 462, // IngressRequestRateLimitExceeded
errorLabels: ['SystemOverloadedError', 'RetryableError']
}
}
* */
await configureFailPoint(this.configuration, {
configureFailPoint: 'failCommand',
mode: { times: 3 },
data: {
failCommands: ['find'],
errorCode: 462,
errorLabels: ['RetryableError', 'SystemOverloadedError']
}
});

// 5. Perform a find operation with `coll` that fails.
const error = await collection.findOne({}).catch(e => e);

// 6. Assert that the raised error contains both the `RetryableError` and `SystemOverloadedError` error labels.
expect(error).to.be.instanceof(MongoServerError);
expect(error.hasErrorLabel('RetryableError')).to.be.true;
expect(error.hasErrorLabel('SystemOverloadedError')).to.be.true;

// 7. Assert that the total number of started commands is 3: one for the initial attempt and two for the retries.
expect(commandsStarted).to.have.length(3);
}
);
});
35 changes: 35 additions & 0 deletions test/spec/uri-options/client-backpressure-options.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
{
"tests": [
{
"description": "adaptiveRetries=true is parsed correctly",
"uri": "mongodb://example.com/?adaptiveRetries=true",
"valid": true,
"warning": false,
"hosts": null,
"auth": null,
"options": {
"adaptiveRetries": true
}
},
{
"description": "adaptiveRetries=false is parsed correctly",
"uri": "mongodb://example.com/?adaptiveRetries=false",
"valid": true,
"warning": false,
"hosts": null,
"auth": null,
"options": {
"adaptiveRetries": false
}
},
{
"description": "adaptiveRetries with invalid value causes a warning",
"uri": "mongodb://example.com/?adaptiveRetries=invalid",
"valid": true,
"warning": true,
"hosts": null,
"auth": null,
"options": null
}
]
}
27 changes: 27 additions & 0 deletions test/spec/uri-options/client-backpressure-options.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
tests:
-
description: "adaptiveRetries=true is parsed correctly"
uri: "mongodb://example.com/?adaptiveRetries=true"
valid: true
warning: false
hosts: ~
auth: ~
options:
adaptiveRetries: true
-
description: "adaptiveRetries=false is parsed correctly"
uri: "mongodb://example.com/?adaptiveRetries=false"
valid: true
warning: false
hosts: ~
auth: ~
options:
adaptiveRetries: false
-
description: "adaptiveRetries with invalid value causes a warning"
uri: "mongodb://example.com/?adaptiveRetries=invalid"
valid: true
warning: true
hosts: ~
auth: ~
options: ~
1 change: 1 addition & 0 deletions test/tools/uri_spec_runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,7 @@ export function executeUriValidationTest(
case 'serverSelectionTimeoutMS':
case 'serverMonitoringMode':
case 'socketTimeoutMS':
case 'adaptiveRetries':
case 'retryWrites':
case 'directConnection':
case 'loadBalanced':
Expand Down
Loading