This repository was archived by the owner on Oct 12, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2
Add replicaset support #18
Open
smoliji
wants to merge
9
commits into
master
Choose a base branch
from
feat/replicasets
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
9 commits
Select commit
Hold shift + click to select a range
92fd48a
Add replicaset support
smoliji 5702d0c
✨ Transacted queries are passed to write replicas
smoliji 48d6976
⬇️ Use pg 7.18
smoliji 1c3fabd
✨ Pass write node config to proxy knex instance
smoliji befb3ff
✅ Add bookshelf tests
smoliji ab138d8
🚧 Prepare to remove mock replica
smoliji 5e901d5
✨ Truncate queries as write query
smoliji aab0db9
✏️ Fix query context's replica node
smoliji 2a3eca9
⬆️Update knex and pg
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,26 @@ | ||
| version: '3' | ||
| services: | ||
| postgres1: | ||
| image: postgres:11 | ||
| environment: | ||
| POSTGRES_USER: databless | ||
| POSTGRES_PASSWORD: databless | ||
| POSTGRES_DB: databless | ||
| ports: | ||
| - "10001:5432" | ||
| postgres2: | ||
| image: postgres:11 | ||
| environment: | ||
| POSTGRES_USER: databless | ||
| POSTGRES_PASSWORD: databless | ||
| POSTGRES_DB: databless | ||
| ports: | ||
| - "10002:5432" | ||
| postgres3: | ||
| image: postgres:11 | ||
| environment: | ||
| POSTGRES_USER: databless | ||
| POSTGRES_PASSWORD: databless | ||
| POSTGRES_DB: databless | ||
| ports: | ||
| - "10003:5432" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,113 @@ | ||
| // Based on https://github.com/knex/knex/issues/2253#issuecomment-551610832 | ||
|
|
||
| const Knex = require('knex'); | ||
| const initKnex = require('./initKnex'); | ||
|
|
||
| exports.createRoundRobinSelectionStrategy = () => { | ||
| let round = { | ||
| true: 0, | ||
| false: 0, | ||
| }; | ||
| return (pool, isWrite) => { | ||
| const selected = pool[round[isWrite]]; | ||
| round[isWrite] = (round[isWrite] + 1) % (pool.length); | ||
| return selected; | ||
| }; | ||
| }; | ||
|
|
||
| exports.isWriteQuery = (query) => { | ||
| return ['insert', 'del', 'update', 'truncate'].includes(query.method); | ||
| }; | ||
|
|
||
| exports.isWriteBuilder = (builder) => { | ||
| // Enable query context override: knex.select('*').queryContext({ replicaNode: 'write' | 'read' }) | ||
| if (builder._queryContext && 'replicaNode' in builder._queryContext) { | ||
| return builder._queryContext.replicaNode === 'write'; | ||
| } | ||
| const sql = builder.toSQL(); | ||
| return Array.isArray(sql) ? sql.some(exports.isWriteQuery) : exports.isWriteQuery(sql); | ||
| } | ||
|
|
||
| /** | ||
| * | ||
| * @param {Object} config | ||
| * @param {Array<Knex.Config>} config.writeNodes List of knex configurations | ||
| * for SQL master instances | ||
| * @param {Array<Knex.Config>} config.readNodes List of knex configurations | ||
| * for SQL read-only instances | ||
| * @param {Knex.Config} config.proxy Knex configuration for "knex proxy" | ||
| * used as a single knex handle that ultimately chooses one of the read/write | ||
| * instances, based on given strategy. | ||
| * @param {(pool: Array<Knex>, isWrite: bool) => Knex} config.select | ||
| * @param {*} key Datables knex instance key. | ||
| */ | ||
| exports.initKnex = (config = { writeNodes: [], readNodes: [], proxy: {}, select: createRoundRobinSelectionStrategy() }, key = 'default') => { | ||
| const createKnex = require('knex'); // eslint-disable-line global-require | ||
| config.select = config.select || this.createRoundRobinSelectionStrategy(); | ||
| const writeNodes = config.writeNodes.map(createKnex); | ||
| const readNodes = config.readNodes.map(createKnex); | ||
| const replicaKnex = initKnex(config.proxy, key); | ||
| replicaKnex.client.runner = function (builder) { | ||
| const useWriteNode = exports.isWriteBuilder(builder); | ||
| return config.select(useWriteNode ? writeNodes : readNodes, useWriteNode) | ||
| .client.runner(builder); | ||
| }; | ||
| replicaKnex.client.transaction = function (container, txConfig, outerTx) { | ||
| return config.select(writeNodes, true) | ||
| .client.transaction(container, txConfig, outerTx); | ||
| }; | ||
| replicaKnex.client.destroy = () => { | ||
| return Promise.all([ | ||
|
akrivohlavy marked this conversation as resolved.
|
||
| ...writeNodes.map(node => node.client.destroy()), | ||
| ...readNodes.map(node => node.client.destroy()), | ||
| ]); | ||
| }; | ||
| replicaKnex.client.config = writeNodes[0].client.config; | ||
| replicaKnex.__rdbgwReplicaWriteNodes = writeNodes; | ||
| replicaKnex.__rdbgwReplicaReadNodes = readNodes; | ||
| return replicaKnex; | ||
| }; | ||
|
|
||
| exports.writeReplicas = (knex) => knex.__rdbgwReplicaWriteNodes; | ||
| exports.readReplicas = (knex) => knex.__rdbgwReplicaReadNodes; | ||
|
|
||
|
|
||
| // WIP POC of using first write replica as for proxy knex | ||
| // @experimental | ||
| exports.initKnexMasterAsProxy = (config = { writeNodes: [], readNodes: [], select: createRoundRobinSelectionStrategy() }, key = 'default') => { | ||
| const createKnex = require('knex'); // eslint-disable-line global-require | ||
| config.select = config.select || this.createRoundRobinSelectionStrategy(); | ||
| const replicaKnex = initKnex(config.writeNodes[0], key); | ||
| const writeNodes = config.writeNodes.slice(1).map(createKnex); | ||
| writeNodes.unshift(replicaKnex); | ||
| const readNodes = config.readNodes.map(createKnex); | ||
|
|
||
| const originalClientRunner = replicaKnex.client.runner.bind(replicaKnex.client); | ||
| const originalClientTransaction = replicaKnex.client.transaction.bind(replicaKnex.client); | ||
| const originalClientDestroy = replicaKnex.client.destroy.bind(replicaKnex.client); | ||
| replicaKnex.client.runner = function (builder) { | ||
| const useWriteNode = exports.isWriteBuilder(builder); | ||
| let node = config.select(useWriteNode ? writeNodes : readNodes, useWriteNode); | ||
| if (node === replicaKnex) { | ||
| return originalClientRunner(builder); | ||
| } | ||
| return node.client.runner(builder); | ||
| }; | ||
| replicaKnex.client.transaction = function (container, txConfig, outerTx) { | ||
| let node = config.select(writeNodes, true) | ||
| if (node === replicaKnex) { | ||
| return originalClientTransaction(container, txConfig, outerTx); | ||
| } | ||
| return node.client.transaction(container, txConfig, outerTx); | ||
| }; | ||
| replicaKnex.client.destroy = () => { | ||
| return Promise.all([ | ||
| originalClientDestroy(), | ||
| ...writeNodes.slice(1).map(node => node.client.destroy()), | ||
| ...readNodes.map(node => node.client.destroy()), | ||
| ]); | ||
| }; | ||
| replicaKnex.__rdbgwReplicaWriteNodes = writeNodes; | ||
| replicaKnex.__rdbgwReplicaReadNodes = readNodes; | ||
| return replicaKnex; | ||
| }; | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,120 @@ | ||
| const databless = require('../index'); | ||
| const { replicaset } = require('../index'); | ||
|
|
||
| describe.skip('Replicaset', () => { | ||
| let knex; | ||
| beforeAll(async () => { | ||
| replicaset.initKnex({ | ||
| writeNodes: [ | ||
| { | ||
| client: 'pg', | ||
| connection: { | ||
| host: 'localhost', | ||
| port: '10001', | ||
| database: 'databless', | ||
| user: 'databless', | ||
| password: 'databless', | ||
| }, | ||
| // debug: true, | ||
| }, | ||
| ], | ||
| readNodes: [ | ||
| { | ||
| client: 'pg', | ||
| connection: { | ||
| host: 'localhost', | ||
| port: '10002', | ||
| user: 'databless', | ||
| password: 'databless', | ||
| database: 'databless', | ||
| }, | ||
| }, | ||
| { | ||
| client: 'pg', | ||
| connection: { | ||
| host: 'localhost', | ||
| port: '10003', | ||
| user: 'databless', | ||
| password: 'databless', | ||
| database: 'databless', | ||
| }, | ||
| }, | ||
| ], | ||
| proxy: { | ||
| client: 'pg', | ||
| }, | ||
| select: replicaset.createRoundRobinSelectionStrategy(), | ||
| }); | ||
|
|
||
| knex = databless.getKnex(); | ||
| // Prepare instances - purge & create some schema with a one row | ||
| // to identify an instance | ||
| await Promise.all( | ||
| [ | ||
| ['readInstance1', replicaset.readReplicas(knex)[0]], | ||
| ['readInstance2', replicaset.readReplicas(knex)[1]], | ||
| ['writeInstance1', replicaset.writeReplicas(knex)[0]], | ||
| ] | ||
| .map(async ([instance, knex]) => { | ||
| await knex.raw(` | ||
| DROP SCHEMA public CASCADE; | ||
| CREATE SCHEMA public; | ||
| `); | ||
| await knex.schema.createTable('records', table => { | ||
| table.increments('id').primary(); | ||
| table.string('title'); | ||
| }); | ||
| await knex('records').insert({ title: instance }); | ||
| }) | ||
| ); | ||
| }); | ||
| afterAll(async () => { | ||
| await knex.destroy(); | ||
| }); | ||
| test('RR for reads', async () => { | ||
| { | ||
| const result = await knex('records'); | ||
| // 1st read -> readInstance1 | ||
| expect(result.find(x => x.title === 'readInstance1')).not.toBeUndefined(); | ||
| } | ||
| { | ||
| const result = await knex('records'); | ||
| // 2nd read -> readInstance2 | ||
| expect(result.find(x => x.title === 'readInstance2')).not.toBeUndefined(); | ||
| } | ||
| { | ||
| const result = await knex('records'); | ||
| // 3rd read -> readInstance1 | ||
| expect(result.find(x => x.title === 'readInstance1')).not.toBeUndefined(); | ||
| } | ||
| { | ||
| await knex('records').insert({ title: 'inserted' }); | ||
| const result = await replicaset.writeReplicas(knex)[0]('records'); | ||
| // 1st write -> writeInstance1 | ||
| expect(result.find(x => x.title === 'inserted')).not.toBeUndefined(); | ||
| expect(result.find(x => x.title === 'writeInstance1')).not.toBeUndefined(); | ||
| } | ||
| { | ||
| const result = await knex('records'); | ||
| // 4th read -> readInstance2 | ||
| expect(result.find(x => x.title === 'readInstance2')).not.toBeUndefined(); | ||
| } | ||
| }); | ||
| test('Transacted queries always use write instances', async () => { | ||
| const TRX_INSERT = { title: 'trx-insert' }; | ||
| await knex.transaction(async trx => { | ||
| await knex('records').transacting(trx) | ||
| .insert(TRX_INSERT); | ||
| { | ||
| const result = await knex('records').transacting(trx); | ||
| expect(result.find(x => x.title === TRX_INSERT.title)).not.toBeUndefined(); | ||
| expect(result.find(x => x.title === 'writeInstance1')).not.toBeUndefined(); | ||
| } | ||
| }); | ||
| // And non-trx queries are again from read instances | ||
| { | ||
| const result = await knex('records'); | ||
| expect(result.find(x => x.title === 'writeInstance1')).toBeUndefined(); | ||
| } | ||
| }); | ||
| }); |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be passed via repostitory method options?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am afraid not, these options are passed to Bookshelf directly, but BS doesn't seem to pass these options directly below to Knex.
For now you can use:
Or I can make shortcut for the reposutory to set it via repository option
queryContextas you suggested.