-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathindex.js
More file actions
205 lines (162 loc) Β· 5.26 KB
/
index.js
File metadata and controls
205 lines (162 loc) Β· 5.26 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
'use strict';
const path = require('path');
const fs = require('fs');
const coroutine = require('bluebird').coroutine;
const pkgConf = require('pkg-conf');
const isAsyncSupported = require('is-async-supported');
const requireFromString = require('require-from-string');
function parseSql(str) {
const lines = str.split('\n');
const result = {
up: '',
down: '',
trash: ''
};
let current = 'trash';
for (const line of lines) {
if (line.indexOf('-- +migrate Up') === 0) {
current = 'up';
continue;
}
if (line.indexOf('-- +migrate Down') === 0) {
current = 'down';
continue;
}
result[current] = result[current] + line + '\n';
}
return result;
}
function readMigration(filePath) {
const ext = path.extname(filePath);
let content = fs.readFileSync(filePath, 'utf8');
if (ext === '.js') {
if (!isAsyncSupported()) {
const asyncToGen = require('async-to-gen');
content = asyncToGen(content).toString();
}
return requireFromString(content, filePath);
}
if (ext === '.sql') {
const migration = parseSql(content);
const up = migration.up;
const down = migration.down;
migration.up = t => t.query(up);
migration.down = t => t.query(down);
return migration;
}
}
function ensureTable(db, options) {
return db.query(`CREATE SCHEMA IF NOT EXISTS $1~;`, [options.schema])
.then(() => db.query(`SET search_path TO $1~;`, [options.schema]))
.then(() => db.query(`CREATE TABLE IF NOT EXISTS $1~ (
id serial PRIMARY KEY,
name text,
revision integer,
migration_time timestamp with time zone DEFAULT timezone('msk'::text, now()) NOT NULL,
batch integer
);`, [options.tableName]));
}
function lockup(db, options) {
return db.query(`LOCK TABLE $1~;`, [options.tableName]);
}
function transactio(work) {
return function (options) {
options = Object.assign({
connection: process.env.DATABASE_URL,
directory: './migrations',
tableName: 'migratio',
schema: 'public',
unsafe: false
}, pkgConf.sync('migratio'), options);
let db = options.db;
let pgp;
if (db === undefined) {
pgp = require('pg-promise')({noWarnings: true});
db = pgp(options.connection);
}
if (options.unsafe === true) {
return ensureTable(db, options)
.then(() => work(db, options));
}
return db.tx(t => ensureTable(t, options)
.then(() => lockup(t, options))
.then(() => work(t, options)));
};
}
function validFileName(file) {
const ext = path.extname(file);
if (ext !== '.js' && ext !== '.sql') {
return false;
}
return /^\d+/.test(file);
}
function byRevision(a, b) {
const aRev = parseInt(a, 10);
const bRev = parseInt(b, 10);
return aRev - bRev;
}
const current = coroutine(function * (t, options) {
let migrations;
if (options.revision === undefined) {
const lastBatch = (yield t.one(`SELECT coalesce(MAX(batch), 0) AS max FROM $1~`, [options.tableName])).max;
migrations = yield t.query('SELECT * FROM $1~ WHERE batch = $2 ORDER BY id ASC', [options.tableName, lastBatch]);
} else {
migrations = yield t.query('SELECT * FROM $1~ WHERE revision >= $2 ORDER BY id ASC', [options.tableName, options.revision]);
}
if (options.verbose) {
migrations.forEach(m => console.log(` ${m.name} (batch:${m.batch})`));
if (migrations.length === 0) {
console.log(` No migrations were applied`);
}
}
return migrations;
});
const up = coroutine(function * (t, options) {
const latestMigration = (yield current(t, Object.assign({}, options, {
revision: undefined,
verbose: false
}))).pop() || {};
const latestRevision = latestMigration.revision === undefined ? -1 : latestMigration.revision;
const latestBatch = latestMigration.batch || 0;
const currentBatch = latestBatch + 1;
const files = fs.readdirSync(options.directory)
.filter(validFileName)
.filter(file => parseInt(file, 10) > latestRevision)
.filter(file => parseInt(file, 10) <= (options.revision || Infinity))
.sort(byRevision);
if (files.length === 0 && options.verbose) {
console.log(` Database is up to date`);
}
for (const file of files) {
const filePath = path.resolve(path.join(options.directory, file));
const revision = parseInt(file, 10);
if (options.verbose) {
console.log(` β ${file} (batch:${currentBatch})`);
}
const migration = readMigration(filePath);
yield migration.up(t);
yield t.query(`INSERT INTO $1~ (name, revision, batch) VALUES ($2, $3, $4)`, [options.tableName, file, revision, currentBatch]);
}
});
const down = coroutine(function * (t, options) {
const currentBatch = yield current(t, Object.assign({}, options, {verbose: false}));
if (currentBatch.length === 0 && options.verbose) {
console.log(` No migrations found in database`);
}
if (options.revision !== undefined) {
// Remove migration with revision from be removed
currentBatch.shift();
}
for (const migration of currentBatch.reverse()) {
const filePath = path.resolve(path.join(options.directory, migration.name));
if (options.verbose) {
console.log(` β ${migration.name} (batch:${migration.batch})`);
}
const currentMigration = readMigration(filePath);
yield currentMigration.down(t);
yield t.query('DELETE FROM $1~ WHERE id = $2', [options.tableName, migration.id]);
}
});
module.exports.up = transactio(up);
module.exports.down = transactio(down);
module.exports.current = transactio(current);