diff --git a/README.md b/README.md index 6161771..3859585 100644 --- a/README.md +++ b/README.md @@ -127,4 +127,35 @@ update.withDetailById(id) - `composeQb` - `composeQb(options, qb => ...)` automatic wrap for composing multiple querybuilders in different layers of the application. Prevents qb option overwriting. +### Replicas + +- create a sql replicaset supported knex isntance + ```js + const { replicaset } = require('databless'); + const knex = replicaset.initKnex({ + writeNodes: [/* Knex.Config */], + readNodes: [/* Knex.Config */], + proxy: { client: /* Knex.Config['client'] */ }, + select: replicaset.createRoundRobinSelectionStrategy(), + }); + ``` + + - `proxy` is a "virtual knex" instance, that - when it comes to executing + a query - selects a real knex instance via `select` method and executes query on that knex instance. + - `select` is any fn `(instances: Knex[], isWriteQuery: bool) => Knex)`, + you can use predefined `replicaset.createRoundRobinSelectionStrategy()` as + a default Round Robin selector. +- acessing read/write Knex instances directly + - using proxy knex instance + ```js + replicaset.writeReplicas(knex) // Knex[] + replicaset.readReplicas(knex) // Knex[] + ``` +- destroying proxy instance destroys all underlying knex instances +- testing (`docker-compose` required): + - `cd docker-compose` + - `docker-compose up` + - remove `skip` from `/tests/replicaset.test.js` + - `npm t` + diff --git a/docker-compose/docker-compose.yml b/docker-compose/docker-compose.yml new file mode 100644 index 0000000..d7b7325 --- /dev/null +++ b/docker-compose/docker-compose.yml @@ -0,0 +1,26 @@ +version: '3' +services: + postgres1: + image: postgres:11 + environment: + POSTGRES_USER: databless + POSTGRES_PASSWORD: databless + POSTGRES_DB: databless + ports: + - "10001:5432" + postgres2: + image: postgres:11 + environment: + POSTGRES_USER: databless + POSTGRES_PASSWORD: databless + POSTGRES_DB: databless + ports: + - "10002:5432" + postgres3: + image: postgres:11 + environment: + POSTGRES_USER: databless + POSTGRES_PASSWORD: databless + POSTGRES_DB: databless + ports: + - "10003:5432" \ No newline at end of file diff --git a/index.js b/index.js index 6161f68..372a9ef 100644 --- a/index.js +++ b/index.js @@ -1,10 +1,12 @@ const initKnex = require('./initKnex'); +const replicaset = require('./replicaset'); const initBookshelf = require('./initBookshelf'); /* eslint-disable global-require */ module.exports = { initBookshelf, initKnex, + replicaset, registerBookshelfModels: require('./registerBookshelfModels'), defaultBookshelfRepository: require('./defaultBookshelfRepository'), getKnex: (key = 'default') => initKnex.knexInstances.get(key), diff --git a/package.json b/package.json index f0403d0..ee43666 100644 --- a/package.json +++ b/package.json @@ -39,12 +39,13 @@ "bookshelf-cursor-pagination": "^1.4.2", "bookshelf-paranoia": "^0.11.0", "desmond": "^0.5.6", - "knex": "^0.19.5", + "knex": "^0.21.21", "mysql": "^2.15.0" }, "devDependencies": { "jest": "^24.5.0", "jest-extended": "^0.11.1", + "pg": "^8.7.1", "sqlite3": "^4.0.2" } } diff --git a/replicaset.js b/replicaset.js new file mode 100644 index 0000000..386a158 --- /dev/null +++ b/replicaset.js @@ -0,0 +1,113 @@ +// Based on https://github.com/knex/knex/issues/2253#issuecomment-551610832 + +const Knex = require('knex'); +const initKnex = require('./initKnex'); + +exports.createRoundRobinSelectionStrategy = () => { + let round = { + true: 0, + false: 0, + }; + return (pool, isWrite) => { + const selected = pool[round[isWrite]]; + round[isWrite] = (round[isWrite] + 1) % (pool.length); + return selected; + }; +}; + +exports.isWriteQuery = (query) => { + return ['insert', 'del', 'update', 'truncate'].includes(query.method); +}; + +exports.isWriteBuilder = (builder) => { + // Enable query context override: knex.select('*').queryContext({ replicaNode: 'write' | 'read' }) + if (builder._queryContext && 'replicaNode' in builder._queryContext) { + return builder._queryContext.replicaNode === 'write'; + } + const sql = builder.toSQL(); + return Array.isArray(sql) ? sql.some(exports.isWriteQuery) : exports.isWriteQuery(sql); +} + +/** + * + * @param {Object} config + * @param {Array} config.writeNodes List of knex configurations + * for SQL master instances + * @param {Array} config.readNodes List of knex configurations + * for SQL read-only instances + * @param {Knex.Config} config.proxy Knex configuration for "knex proxy" + * used as a single knex handle that ultimately chooses one of the read/write + * instances, based on given strategy. + * @param {(pool: Array, isWrite: bool) => Knex} config.select + * @param {*} key Datables knex instance key. + */ +exports.initKnex = (config = { writeNodes: [], readNodes: [], proxy: {}, select: createRoundRobinSelectionStrategy() }, key = 'default') => { + const createKnex = require('knex'); // eslint-disable-line global-require + config.select = config.select || this.createRoundRobinSelectionStrategy(); + const writeNodes = config.writeNodes.map(createKnex); + const readNodes = config.readNodes.map(createKnex); + const replicaKnex = initKnex(config.proxy, key); + replicaKnex.client.runner = function (builder) { + const useWriteNode = exports.isWriteBuilder(builder); + return config.select(useWriteNode ? writeNodes : readNodes, useWriteNode) + .client.runner(builder); + }; + replicaKnex.client.transaction = function (container, txConfig, outerTx) { + return config.select(writeNodes, true) + .client.transaction(container, txConfig, outerTx); + }; + replicaKnex.client.destroy = () => { + return Promise.all([ + ...writeNodes.map(node => node.client.destroy()), + ...readNodes.map(node => node.client.destroy()), + ]); + }; + replicaKnex.client.config = writeNodes[0].client.config; + replicaKnex.__rdbgwReplicaWriteNodes = writeNodes; + replicaKnex.__rdbgwReplicaReadNodes = readNodes; + return replicaKnex; +}; + +exports.writeReplicas = (knex) => knex.__rdbgwReplicaWriteNodes; +exports.readReplicas = (knex) => knex.__rdbgwReplicaReadNodes; + + +// WIP POC of using first write replica as for proxy knex +// @experimental +exports.initKnexMasterAsProxy = (config = { writeNodes: [], readNodes: [], select: createRoundRobinSelectionStrategy() }, key = 'default') => { + const createKnex = require('knex'); // eslint-disable-line global-require + config.select = config.select || this.createRoundRobinSelectionStrategy(); + const replicaKnex = initKnex(config.writeNodes[0], key); + const writeNodes = config.writeNodes.slice(1).map(createKnex); + writeNodes.unshift(replicaKnex); + const readNodes = config.readNodes.map(createKnex); + + const originalClientRunner = replicaKnex.client.runner.bind(replicaKnex.client); + const originalClientTransaction = replicaKnex.client.transaction.bind(replicaKnex.client); + const originalClientDestroy = replicaKnex.client.destroy.bind(replicaKnex.client); + replicaKnex.client.runner = function (builder) { + const useWriteNode = exports.isWriteBuilder(builder); + let node = config.select(useWriteNode ? writeNodes : readNodes, useWriteNode); + if (node === replicaKnex) { + return originalClientRunner(builder); + } + return node.client.runner(builder); + }; + replicaKnex.client.transaction = function (container, txConfig, outerTx) { + let node = config.select(writeNodes, true) + if (node === replicaKnex) { + return originalClientTransaction(container, txConfig, outerTx); + } + return node.client.transaction(container, txConfig, outerTx); + }; + replicaKnex.client.destroy = () => { + return Promise.all([ + originalClientDestroy(), + ...writeNodes.slice(1).map(node => node.client.destroy()), + ...readNodes.map(node => node.client.destroy()), + ]); + }; + replicaKnex.__rdbgwReplicaWriteNodes = writeNodes; + replicaKnex.__rdbgwReplicaReadNodes = readNodes; + return replicaKnex; +}; diff --git a/tests/replicaset.test.js b/tests/replicaset.test.js new file mode 100644 index 0000000..c7d9c78 --- /dev/null +++ b/tests/replicaset.test.js @@ -0,0 +1,120 @@ +const databless = require('../index'); +const { replicaset } = require('../index'); + +describe.skip('Replicaset', () => { + let knex; + beforeAll(async () => { + replicaset.initKnex({ + writeNodes: [ + { + client: 'pg', + connection: { + host: 'localhost', + port: '10001', + database: 'databless', + user: 'databless', + password: 'databless', + }, + // debug: true, + }, + ], + readNodes: [ + { + client: 'pg', + connection: { + host: 'localhost', + port: '10002', + user: 'databless', + password: 'databless', + database: 'databless', + }, + }, + { + client: 'pg', + connection: { + host: 'localhost', + port: '10003', + user: 'databless', + password: 'databless', + database: 'databless', + }, + }, + ], + proxy: { + client: 'pg', + }, + select: replicaset.createRoundRobinSelectionStrategy(), + }); + + knex = databless.getKnex(); + // Prepare instances - purge & create some schema with a one row + // to identify an instance + await Promise.all( + [ + ['readInstance1', replicaset.readReplicas(knex)[0]], + ['readInstance2', replicaset.readReplicas(knex)[1]], + ['writeInstance1', replicaset.writeReplicas(knex)[0]], + ] + .map(async ([instance, knex]) => { + await knex.raw(` + DROP SCHEMA public CASCADE; + CREATE SCHEMA public; + `); + await knex.schema.createTable('records', table => { + table.increments('id').primary(); + table.string('title'); + }); + await knex('records').insert({ title: instance }); + }) + ); + }); + afterAll(async () => { + await knex.destroy(); + }); + test('RR for reads', async () => { + { + const result = await knex('records'); + // 1st read -> readInstance1 + expect(result.find(x => x.title === 'readInstance1')).not.toBeUndefined(); + } + { + const result = await knex('records'); + // 2nd read -> readInstance2 + expect(result.find(x => x.title === 'readInstance2')).not.toBeUndefined(); + } + { + const result = await knex('records'); + // 3rd read -> readInstance1 + expect(result.find(x => x.title === 'readInstance1')).not.toBeUndefined(); + } + { + await knex('records').insert({ title: 'inserted' }); + const result = await replicaset.writeReplicas(knex)[0]('records'); + // 1st write -> writeInstance1 + expect(result.find(x => x.title === 'inserted')).not.toBeUndefined(); + expect(result.find(x => x.title === 'writeInstance1')).not.toBeUndefined(); + } + { + const result = await knex('records'); + // 4th read -> readInstance2 + expect(result.find(x => x.title === 'readInstance2')).not.toBeUndefined(); + } + }); + test('Transacted queries always use write instances', async () => { + const TRX_INSERT = { title: 'trx-insert' }; + await knex.transaction(async trx => { + await knex('records').transacting(trx) + .insert(TRX_INSERT); + { + const result = await knex('records').transacting(trx); + expect(result.find(x => x.title === TRX_INSERT.title)).not.toBeUndefined(); + expect(result.find(x => x.title === 'writeInstance1')).not.toBeUndefined(); + } + }); + // And non-trx queries are again from read instances + { + const result = await knex('records'); + expect(result.find(x => x.title === 'writeInstance1')).toBeUndefined(); + } + }); +}); diff --git a/tests/replicasetbs.test.js b/tests/replicasetbs.test.js new file mode 100644 index 0000000..0c69376 --- /dev/null +++ b/tests/replicasetbs.test.js @@ -0,0 +1,138 @@ +const databless = require('../index'); +const { replicaset } = require('../index'); +const { format: sprintf } = require('util') + +describe.skip('Replicaset - Bookshelf', () => { + let knex; + beforeAll(async () => { + replicaset.initKnex({ + writeNodes: [ + { + client: 'pg', + connection: { + host: 'localhost', + port: '10001', + database: 'databless', + user: 'databless', + password: 'databless', + }, + }, + ], + readNodes: [ + { + client: 'pg', + connection: { + host: 'localhost', + port: '10001', + database: 'databless', + user: 'databless', + password: 'databless', + }, + }, + ], + proxy: { + client: 'pg', + }, + select: replicaset.createRoundRobinSelectionStrategy(), + }); + + knex = databless.getKnex(); + // Prepare instances - purge & create some schema with a one row + // to identify an instance + await Promise.all( + [ + replicaset.writeReplicas(knex) + ] + .map(async ([knex]) => { + await knex.raw(` + DROP SCHEMA public CASCADE; + CREATE SCHEMA public; + `); + }) + ); + }); + afterAll(async () => { + await knex.destroy(); + }); + describe('Bookshelf integration', () => { + let Model; + beforeAll(async () => { + await knex.schema.createTable('records', table => { + table.increments('id').primary(); + table.string('title'); + }); + const registerModel = (bookshelf) => { + const Model = databless.getBookshelf().Model.extend({ + tableName: 'records', + + tags() { + return this.belongsToMany(Tag); + } + }); + return bookshelf.model('Model', Model); + } + const bookshelf = databless.initBookshelf(knex); + Model = registerModel(bookshelf); + }); + + afterAll(async () => { + await knex.destroy(); + }); + test('CRUD', async () => { + let createId; + // Create + { + const result = (await Model.forge({ title: 'bs-model-insert' }).save()).toJSON(); + expect(result.id).not.toBeUndefined(); + createId = result.id; + await mirror(knex); + } + // Read + { + const result = (await Model.forge({ id: createId }).fetch()); + expect(result.id).toEqual(createId); + } + // Update + { + (await Model.forge({ id: createId }) + .save({ title: 'bs-model-insert-updated' })).toJSON(); + await mirror(knex); + const result = (await Model.where({ id: createId }).fetch()).toJSON(); + expect(result.title).toEqual('bs-model-insert-updated'); + } + { + await Model.where({ id: createId }).destroy(); + await mirror(knex); + const result = (await Model.where({ id: createId }).fetch()); + expect(result).toEqual(null); + } + }); + // Unksip when MIGRATION_DIR and SEED_DIR is provided + describe.skip('Seeds', () => { + beforeAll(async () => { + await knex.migrate.latest({ directory: process.env.MIGRATION_DIR }); + }); + afterAll(() => { + // Maybe your seeds create always a new instance and cannot exist gracefully + process.exit(0); + }); + test('Run seeds', async () => { + await knex.seed.run({ directory: process.env.SEED_DIR }); + }) + }); + }); +}); + +async function mirror(replicaKnex) { + const source = replicaset.writeReplicas(replicaKnex)[0]; // ! only one + const dests = replicaset.readReplicas(replicaKnex); + const records = await source('records'); + await Promise.all( + dests.map(async dest => { + await dest.raw(sprintf('TRUNCATE TABLE %s', 'records')); + await Promise.all( + records.map(record => dest('records').insert(record)) + ); + }) + ); +} \ No newline at end of file