Skip to content
This repository was archived by the owner on Oct 12, 2023. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,4 +127,35 @@ update.withDetailById(id)
- `composeQb` - `composeQb(options, qb => ...)` automatic wrap for composing multiple querybuilders in different layers of the application. Prevents qb option overwriting.


### Replicas

- create a sql replicaset supported knex isntance
```js
const { replicaset } = require('databless');
const knex = replicaset.initKnex({
writeNodes: [/* Knex.Config */],
readNodes: [/* Knex.Config */],
proxy: { client: /* Knex.Config['client'] */ },
select: replicaset.createRoundRobinSelectionStrategy(),
});
```

- `proxy` is a "virtual knex" instance, that - when it comes to executing
a query - selects a real knex instance via `select` method and executes query on that knex instance.
- `select` is any fn `(instances: Knex[], isWriteQuery: bool) => Knex)`,
you can use predefined `replicaset.createRoundRobinSelectionStrategy()` as
a default Round Robin selector.
- acessing read/write Knex instances directly
- using proxy knex instance
```js
replicaset.writeReplicas(knex) // Knex[]
replicaset.readReplicas(knex) // Knex[]
```
- destroying proxy instance destroys all underlying knex instances
- testing (`docker-compose` required):
- `cd docker-compose`
- `docker-compose up`
- remove `skip` from `/tests/replicaset.test.js`
- `npm t`


26 changes: 26 additions & 0 deletions docker-compose/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
version: '3'
services:
postgres1:
image: postgres:11
environment:
POSTGRES_USER: databless
POSTGRES_PASSWORD: databless
POSTGRES_DB: databless
ports:
- "10001:5432"
postgres2:
image: postgres:11
environment:
POSTGRES_USER: databless
POSTGRES_PASSWORD: databless
POSTGRES_DB: databless
ports:
- "10002:5432"
postgres3:
image: postgres:11
environment:
POSTGRES_USER: databless
POSTGRES_PASSWORD: databless
POSTGRES_DB: databless
ports:
- "10003:5432"
2 changes: 2 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
const initKnex = require('./initKnex');
const replicaset = require('./replicaset');
const initBookshelf = require('./initBookshelf');

/* eslint-disable global-require */
module.exports = {
initBookshelf,
initKnex,
replicaset,
registerBookshelfModels: require('./registerBookshelfModels'),
defaultBookshelfRepository: require('./defaultBookshelfRepository'),
getKnex: (key = 'default') => initKnex.knexInstances.get(key),
Expand Down
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,13 @@
"bookshelf-cursor-pagination": "^1.4.2",
"bookshelf-paranoia": "^0.11.0",
"desmond": "^0.5.6",
"knex": "^0.19.5",
"knex": "^0.21.21",
"mysql": "^2.15.0"
},
"devDependencies": {
"jest": "^24.5.0",
"jest-extended": "^0.11.1",
"pg": "^8.7.1",
"sqlite3": "^4.0.2"
}
}
113 changes: 113 additions & 0 deletions replicaset.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Based on https://github.com/knex/knex/issues/2253#issuecomment-551610832

const Knex = require('knex');
const initKnex = require('./initKnex');

exports.createRoundRobinSelectionStrategy = () => {
let round = {
true: 0,
false: 0,
};
return (pool, isWrite) => {
const selected = pool[round[isWrite]];
round[isWrite] = (round[isWrite] + 1) % (pool.length);
return selected;
};
};

exports.isWriteQuery = (query) => {
return ['insert', 'del', 'update', 'truncate'].includes(query.method);
};

exports.isWriteBuilder = (builder) => {
// Enable query context override: knex.select('*').queryContext({ replicaNode: 'write' | 'read' })
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be passed via repostitory method options?

repo.detail({ id : 1 }, { queryContext: { replicaNode: 'write' } })

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am afraid not, these options are passed to Bookshelf directly, but BS doesn't seem to pass these options directly below to Knex.

For now you can use:

repo.detail({ id : 1 }, { qb => qb.queryContext({ replicaNode: 'write' }) } )

Or I can make shortcut for the reposutory to set it via repository option queryContext as you suggested.

if (builder._queryContext && 'replicaNode' in builder._queryContext) {
return builder._queryContext.replicaNode === 'write';
}
const sql = builder.toSQL();
return Array.isArray(sql) ? sql.some(exports.isWriteQuery) : exports.isWriteQuery(sql);
}

/**
*
* @param {Object} config
* @param {Array<Knex.Config>} config.writeNodes List of knex configurations
* for SQL master instances
* @param {Array<Knex.Config>} config.readNodes List of knex configurations
* for SQL read-only instances
* @param {Knex.Config} config.proxy Knex configuration for "knex proxy"
* used as a single knex handle that ultimately chooses one of the read/write
* instances, based on given strategy.
* @param {(pool: Array<Knex>, isWrite: bool) => Knex} config.select
* @param {*} key Datables knex instance key.
*/
exports.initKnex = (config = { writeNodes: [], readNodes: [], proxy: {}, select: createRoundRobinSelectionStrategy() }, key = 'default') => {
const createKnex = require('knex'); // eslint-disable-line global-require
config.select = config.select || this.createRoundRobinSelectionStrategy();
const writeNodes = config.writeNodes.map(createKnex);
const readNodes = config.readNodes.map(createKnex);
const replicaKnex = initKnex(config.proxy, key);
replicaKnex.client.runner = function (builder) {
const useWriteNode = exports.isWriteBuilder(builder);
return config.select(useWriteNode ? writeNodes : readNodes, useWriteNode)
.client.runner(builder);
};
replicaKnex.client.transaction = function (container, txConfig, outerTx) {
return config.select(writeNodes, true)
.client.transaction(container, txConfig, outerTx);
};
replicaKnex.client.destroy = () => {
return Promise.all([
Comment thread
akrivohlavy marked this conversation as resolved.
...writeNodes.map(node => node.client.destroy()),
...readNodes.map(node => node.client.destroy()),
]);
};
replicaKnex.client.config = writeNodes[0].client.config;
replicaKnex.__rdbgwReplicaWriteNodes = writeNodes;
replicaKnex.__rdbgwReplicaReadNodes = readNodes;
return replicaKnex;
};

exports.writeReplicas = (knex) => knex.__rdbgwReplicaWriteNodes;
exports.readReplicas = (knex) => knex.__rdbgwReplicaReadNodes;


// WIP POC of using first write replica as for proxy knex
// @experimental
exports.initKnexMasterAsProxy = (config = { writeNodes: [], readNodes: [], select: createRoundRobinSelectionStrategy() }, key = 'default') => {
const createKnex = require('knex'); // eslint-disable-line global-require
config.select = config.select || this.createRoundRobinSelectionStrategy();
const replicaKnex = initKnex(config.writeNodes[0], key);
const writeNodes = config.writeNodes.slice(1).map(createKnex);
writeNodes.unshift(replicaKnex);
const readNodes = config.readNodes.map(createKnex);

const originalClientRunner = replicaKnex.client.runner.bind(replicaKnex.client);
const originalClientTransaction = replicaKnex.client.transaction.bind(replicaKnex.client);
const originalClientDestroy = replicaKnex.client.destroy.bind(replicaKnex.client);
replicaKnex.client.runner = function (builder) {
const useWriteNode = exports.isWriteBuilder(builder);
let node = config.select(useWriteNode ? writeNodes : readNodes, useWriteNode);
if (node === replicaKnex) {
return originalClientRunner(builder);
}
return node.client.runner(builder);
};
replicaKnex.client.transaction = function (container, txConfig, outerTx) {
let node = config.select(writeNodes, true)
if (node === replicaKnex) {
return originalClientTransaction(container, txConfig, outerTx);
}
return node.client.transaction(container, txConfig, outerTx);
};
replicaKnex.client.destroy = () => {
return Promise.all([
originalClientDestroy(),
...writeNodes.slice(1).map(node => node.client.destroy()),
...readNodes.map(node => node.client.destroy()),
]);
};
replicaKnex.__rdbgwReplicaWriteNodes = writeNodes;
replicaKnex.__rdbgwReplicaReadNodes = readNodes;
return replicaKnex;
};
120 changes: 120 additions & 0 deletions tests/replicaset.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
const databless = require('../index');
const { replicaset } = require('../index');

describe.skip('Replicaset', () => {
let knex;
beforeAll(async () => {
replicaset.initKnex({
writeNodes: [
{
client: 'pg',
connection: {
host: 'localhost',
port: '10001',
database: 'databless',
user: 'databless',
password: 'databless',
},
// debug: true,
},
],
readNodes: [
{
client: 'pg',
connection: {
host: 'localhost',
port: '10002',
user: 'databless',
password: 'databless',
database: 'databless',
},
},
{
client: 'pg',
connection: {
host: 'localhost',
port: '10003',
user: 'databless',
password: 'databless',
database: 'databless',
},
},
],
proxy: {
client: 'pg',
},
select: replicaset.createRoundRobinSelectionStrategy(),
});

knex = databless.getKnex();
// Prepare instances - purge & create some schema with a one row
// to identify an instance
await Promise.all(
[
['readInstance1', replicaset.readReplicas(knex)[0]],
['readInstance2', replicaset.readReplicas(knex)[1]],
['writeInstance1', replicaset.writeReplicas(knex)[0]],
]
.map(async ([instance, knex]) => {
await knex.raw(`
DROP SCHEMA public CASCADE;
CREATE SCHEMA public;
`);
await knex.schema.createTable('records', table => {
table.increments('id').primary();
table.string('title');
});
await knex('records').insert({ title: instance });
})
);
});
afterAll(async () => {
await knex.destroy();
});
test('RR for reads', async () => {
{
const result = await knex('records');
// 1st read -> readInstance1
expect(result.find(x => x.title === 'readInstance1')).not.toBeUndefined();
}
{
const result = await knex('records');
// 2nd read -> readInstance2
expect(result.find(x => x.title === 'readInstance2')).not.toBeUndefined();
}
{
const result = await knex('records');
// 3rd read -> readInstance1
expect(result.find(x => x.title === 'readInstance1')).not.toBeUndefined();
}
{
await knex('records').insert({ title: 'inserted' });
const result = await replicaset.writeReplicas(knex)[0]('records');
// 1st write -> writeInstance1
expect(result.find(x => x.title === 'inserted')).not.toBeUndefined();
expect(result.find(x => x.title === 'writeInstance1')).not.toBeUndefined();
}
{
const result = await knex('records');
// 4th read -> readInstance2
expect(result.find(x => x.title === 'readInstance2')).not.toBeUndefined();
}
});
test('Transacted queries always use write instances', async () => {
const TRX_INSERT = { title: 'trx-insert' };
await knex.transaction(async trx => {
await knex('records').transacting(trx)
.insert(TRX_INSERT);
{
const result = await knex('records').transacting(trx);
expect(result.find(x => x.title === TRX_INSERT.title)).not.toBeUndefined();
expect(result.find(x => x.title === 'writeInstance1')).not.toBeUndefined();
}
});
// And non-trx queries are again from read instances
{
const result = await knex('records');
expect(result.find(x => x.title === 'writeInstance1')).toBeUndefined();
}
});
});
Loading