diff --git a/tx/vs/vs-database.js b/tx/vs/vs-database.js index 3f4dc58a..08087ac6 100644 --- a/tx/vs/vs-database.js +++ b/tx/vs/vs-database.js @@ -37,9 +37,9 @@ class ValueSetDatabase { if (!hasCol) { migrations.push(new Promise((res, rej) => { db.run( - "ALTER TABLE valuesets ADD COLUMN date_first_seen INTEGER DEFAULT 0", - [], - (err) => err ? rej(err) : res() + "ALTER TABLE valuesets ADD COLUMN date_first_seen INTEGER DEFAULT 0", + [], + (err) => err ? rej(err) : res() ); })); } @@ -47,13 +47,22 @@ class ValueSetDatabase { migrations.push(new Promise((res, rej) => { db.run(` CREATE TABLE IF NOT EXISTS vsac_runs ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - started_at INTEGER NOT NULL, - finished_at INTEGER, - status TEXT NOT NULL DEFAULT 'running', - error_message TEXT, - total_fetched INTEGER, - total_new INTEGER + id INTEGER PRIMARY KEY AUTOINCREMENT, + started_at INTEGER NOT NULL, + finished_at INTEGER, + status TEXT NOT NULL DEFAULT 'running', + error_message TEXT, + total_fetched INTEGER, + total_new INTEGER + ) + `, [], (err) => err ? rej(err) : res()); + })); + // Ensure vsac_settings table exists (for _lastUpdated tracking etc.) + migrations.push(new Promise((res, rej) => { + db.run(` + CREATE TABLE IF NOT EXISTS vsac_settings ( + key TEXT PRIMARY KEY, + value TEXT ) `, [], (err) => err ? rej(err) : res()); })); @@ -170,69 +179,77 @@ class ValueSetDatabase { db.serialize(() => { // Main value sets table db.run(` - CREATE TABLE valuesets ( - id TEXT PRIMARY KEY, - url TEXT, - version TEXT, - date TEXT, - description TEXT, - effectivePeriod_start TEXT, - effectivePeriod_end TEXT, - expansion_identifier TEXT, - name TEXT, - publisher TEXT, - status TEXT, - title TEXT, - content TEXT NOT NULL, - last_seen INTEGER DEFAULT (strftime('%s', 'now')), - date_first_seen INTEGER DEFAULT (strftime('%s', 'now')) - ) + CREATE TABLE valuesets ( + id TEXT PRIMARY KEY, + url TEXT, + version TEXT, + date TEXT, + description TEXT, + effectivePeriod_start TEXT, + effectivePeriod_end TEXT, + expansion_identifier TEXT, + name TEXT, + publisher TEXT, + status TEXT, + title TEXT, + content TEXT NOT NULL, + last_seen INTEGER DEFAULT (strftime('%s', 'now')), + date_first_seen INTEGER DEFAULT (strftime('%s', 'now')) + ) `); // Identifiers table (0..* Identifier) db.run(` - CREATE TABLE valueset_identifiers ( - valueset_id TEXT, - system TEXT, - value TEXT, - use_code TEXT, - type_system TEXT, - type_code TEXT, - FOREIGN KEY (valueset_id) REFERENCES valuesets(url) - ) + CREATE TABLE valueset_identifiers ( + valueset_id TEXT, + system TEXT, + value TEXT, + use_code TEXT, + type_system TEXT, + type_code TEXT, + FOREIGN KEY (valueset_id) REFERENCES valuesets(url) + ) `); // Jurisdictions table (0..* CodeableConcept with 0..* Coding) db.run(` - CREATE TABLE valueset_jurisdictions ( - valueset_id TEXT, - system TEXT, - code TEXT, - display TEXT, - FOREIGN KEY (valueset_id) REFERENCES valuesets(url) - ) + CREATE TABLE valueset_jurisdictions ( + valueset_id TEXT, + system TEXT, + code TEXT, + display TEXT, + FOREIGN KEY (valueset_id) REFERENCES valuesets(url) + ) `); // Systems table (from compose.include[].system) db.run(` - CREATE TABLE valueset_systems ( - valueset_id TEXT, - system TEXT, - version TEXT, - FOREIGN KEY (valueset_id) REFERENCES valuesets(url) - ) + CREATE TABLE valueset_systems ( + valueset_id TEXT, + system TEXT, + version TEXT, + FOREIGN KEY (valueset_id) REFERENCES valuesets(url) + ) `); // Run tracking table db.run(` - CREATE TABLE vsac_runs ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - started_at INTEGER NOT NULL, - finished_at INTEGER, - status TEXT NOT NULL DEFAULT 'running', - error_message TEXT, - total_fetched INTEGER, - total_new INTEGER + CREATE TABLE vsac_runs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + started_at INTEGER NOT NULL, + finished_at INTEGER, + status TEXT NOT NULL DEFAULT 'running', + error_message TEXT, + total_fetched INTEGER, + total_new INTEGER + ) + `); + + // Settings table (key-value store for _lastUpdated tracking etc.) + db.run(` + CREATE TABLE IF NOT EXISTS vsac_settings ( + key TEXT PRIMARY KEY, + value TEXT ) `); @@ -271,9 +288,9 @@ class ValueSetDatabase { const db = await this._getWriteConnection(); return new Promise((resolve, reject) => { db.run( - `INSERT INTO vsac_runs (started_at, status) VALUES (strftime('%s','now'), 'running')`, - [], - function(err) { err ? reject(err) : resolve(this.lastID); } + `INSERT INTO vsac_runs (started_at, status) VALUES (strftime('%s','now'), 'running')`, + [], + function(err) { err ? reject(err) : resolve(this.lastID); } ); }); } @@ -289,10 +306,10 @@ class ValueSetDatabase { const db = await this._getWriteConnection(); return new Promise((resolve, reject) => { db.run( - `UPDATE vsac_runs SET finished_at = strftime('%s','now'), status = 'ok', - total_fetched = ?, total_new = ? WHERE id = ?`, - [totalFetched, totalNew, id], - err => err ? reject(err) : resolve() + `UPDATE vsac_runs SET finished_at = strftime('%s','now'), status = 'ok', + total_fetched = ?, total_new = ? WHERE id = ?`, + [totalFetched, totalNew, id], + err => err ? reject(err) : resolve() ); }); } @@ -307,10 +324,43 @@ class ValueSetDatabase { const db = await this._getWriteConnection(); return new Promise((resolve, reject) => { db.run( - `UPDATE vsac_runs SET finished_at = strftime('%s','now'), status = 'error', - error_message = ? WHERE id = ?`, - [errorMessage, id], - err => err ? reject(err) : resolve() + `UPDATE vsac_runs SET finished_at = strftime('%s','now'), status = 'error', + error_message = ? WHERE id = ?`, + [errorMessage, id], + err => err ? reject(err) : resolve() + ); + }); + } + + /** + * Get a setting value from the vsac_settings table + * @param {string} key - The setting key + * @returns {Promise} The setting value, or null if not found + */ + async getSetting(key) { + const db = await this._getReadConnection(); + return new Promise((resolve, reject) => { + db.get('SELECT value FROM vsac_settings WHERE key = ?', [key], (err, row) => { + if (err) reject(err); + else resolve(row ? row.value : null); + }); + }); + } + + /** + * Set a setting value in the vsac_settings table + * @param {string} key - The setting key + * @param {string} value - The setting value + * @returns {Promise} + */ + async setSetting(key, value) { + const db = await this._getWriteConnection(); + return new Promise((resolve, reject) => { + db.run( + `INSERT INTO vsac_settings (key, value) VALUES (?, ?) + ON CONFLICT(key) DO UPDATE SET value = excluded.value`, + [key, value], + err => err ? reject(err) : resolve() ); }); } @@ -353,24 +403,24 @@ class ValueSetDatabase { const expansionId = valueSet.expansion?.identifier || null; db.run(` - INSERT INTO valuesets ( - id, url, version, date, description, effectivePeriod_start, effectivePeriod_end, - expansion_identifier, name, publisher, status, title, content, last_seen, date_first_seen - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, strftime('%s', 'now'), strftime('%s', 'now')) - ON CONFLICT(id) DO UPDATE SET - url=excluded.url, - version=excluded.version, - date=excluded.date, - description=excluded.description, - effectivePeriod_start=excluded.effectivePeriod_start, - effectivePeriod_end=excluded.effectivePeriod_end, - expansion_identifier=excluded.expansion_identifier, - name=excluded.name, - publisher=excluded.publisher, - status=excluded.status, - title=excluded.title, - content=excluded.content, - last_seen=strftime('%s', 'now') + INSERT INTO valuesets ( + id, url, version, date, description, effectivePeriod_start, effectivePeriod_end, + expansion_identifier, name, publisher, status, title, content, last_seen, date_first_seen + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, strftime('%s', 'now'), strftime('%s', 'now')) + ON CONFLICT(id) DO UPDATE SET + url=excluded.url, + version=excluded.version, + date=excluded.date, + description=excluded.description, + effectivePeriod_start=excluded.effectivePeriod_start, + effectivePeriod_end=excluded.effectivePeriod_end, + expansion_identifier=excluded.expansion_identifier, + name=excluded.name, + publisher=excluded.publisher, + status=excluded.status, + title=excluded.title, + content=excluded.content, + last_seen=strftime('%s', 'now') `, [ valueSet.id, valueSet.url, @@ -414,10 +464,10 @@ class ValueSetDatabase { return new Promise((resolve, reject) => { db.run(` - update valuesets - set last_seen = strftime('%s', 'now') - where url = ? - and version = ? + update valuesets + set last_seen = strftime('%s', 'now') + where url = ? + and version = ? `, [ valueSet.url, valueSet.version @@ -466,9 +516,9 @@ class ValueSetDatabase { const typeCode = id.type?.coding?.[0]?.code || null; db.run(` - INSERT INTO valueset_identifiers ( - valueset_id, system, value, use_code, type_system, type_code - ) VALUES (?, ?, ?, ?, ?, ?) + INSERT INTO valueset_identifiers ( + valueset_id, system, value, use_code, type_system, type_code + ) VALUES (?, ?, ?, ?, ?, ?) `, [ valueSet.id, id.system || null, @@ -490,9 +540,9 @@ class ValueSetDatabase { for (const coding of jurisdiction.coding) { pendingOperations++; db.run(` - INSERT INTO valueset_jurisdictions ( - valueset_id, system, code, display - ) VALUES (?, ?, ?, ?) + INSERT INTO valueset_jurisdictions ( + valueset_id, system, code, display + ) VALUES (?, ?, ?, ?) `, [ valueSet.id, coding.system || null, @@ -514,7 +564,7 @@ class ValueSetDatabase { pendingOperations++; db.run(` - INSERT INTO valueset_systems (valueset_id, system, version) VALUES (?, ?, ?) + INSERT INTO valueset_systems (valueset_id, system, version) VALUES (?, ?, ?) `, [valueSet.id, include.system, include.version], function(err) { if (err) { operationError(new Error(`Failed to insert system: ${err.message}`)); @@ -598,12 +648,12 @@ class ValueSetDatabase { async search(spaceId, map, searchParams, elements = null) { // Check if we can optimize by selecting only indexed columns const canOptimize = elements && elements.length > 0 && - elements.every(e => INDEXED_COLUMNS.includes(e)); + elements.every(e => INDEXED_COLUMNS.includes(e)); // Always include 'id' in the columns to select when optimizing const columnsToSelect = canOptimize - ? (elements.includes('id') ? elements : ['id', ...elements]) - : null; + ? (elements.includes('id') ? elements : ['id', ...elements]) + : null; const db = await this._getReadConnection(); diff --git a/tx/vs/vs-vsac.js b/tx/vs/vs-vsac.js index 4ea21ddd..8acee74c 100644 --- a/tx/vs/vs-vsac.js +++ b/tx/vs/vs-vsac.js @@ -78,7 +78,7 @@ class VSACValueSetProvider extends AbstractValueSetProvider { if (this.valueSetMap.size == 0) { await this.refreshValueSets(); } - + await this.refreshValueSets(); // TODO: remove this // Start periodic refresh this._startRefreshTimer(); this.initialized = true; @@ -174,6 +174,14 @@ class VSACValueSetProvider extends AbstractValueSetProvider { console.log(`VSAC refresh phase 1 done. Total: ${count} with ${ncount} new items`); this.stats.task('VSAC Sync', `VSAC refresh phase 1 done. Total: ${count} with ${ncount} new items`); + // phase 1b: query for recently updated value sets via _lastUpdated + let lastUpdatedCount = await this._scanLastUpdated(); + console.log(`VSAC refresh phase 1b done. ${lastUpdatedCount} additional items from _lastUpdated`); + this.stats.task('VSAC Sync', `Phase 1b: ${lastUpdatedCount} from _lastUpdated`); + + // deduplicate the queue + this.queue = [...new Set(this.queue)]; + let tracking = { totalFetched: 0, totalNew: 0, count: 0, newCount : 0 }; // phase 2: query for history & content this.requeue = []; @@ -418,8 +426,8 @@ class VSACValueSetProvider extends AbstractValueSetProvider { isRefreshing: this.isRefreshing, refreshIntervalHours: this.refreshIntervalHours, nextRefresh: this.refreshTimer && this.lastRefresh - ? new Date(this.lastRefresh.getTime() + (this.refreshIntervalHours * 60 * 60 * 1000)) - : null + ? new Date(this.lastRefresh.getTime() + (this.refreshIntervalHours * 60 * 60 * 1000)) + : null } }; } @@ -506,8 +514,8 @@ class VSACValueSetProvider extends AbstractValueSetProvider { if (bundle.entry && bundle.entry.length > 0) { // Extract ValueSets from bundle entries const valueSets = bundle.entry - .filter(entry => entry.resource && entry.resource.resourceType === 'ValueSet') - .map(entry => entry.resource); + .filter(entry => entry.resource && entry.resource.resourceType === 'ValueSet') + .map(entry => entry.resource); if (valueSets.length > 0) { tracking.totalNew = tracking.totalNew + await this.batchUpsertValueSets(valueSets); tracking.totalFetched += valueSets.length; @@ -519,6 +527,58 @@ class VSACValueSetProvider extends AbstractValueSetProvider { this.stats.task('VSAC Sync', logMsg); } + /** + * Scan VSAC for recently updated value sets using the _lastUpdated parameter. + * Uses a stored date from the previous run; if none exists, defaults to 4 days ago. + * Adds any found URLs to this.queue and stores the server's response date for next time. + * @returns {Promise} Number of value set URLs added to the queue + * @private + */ + async _scanLastUpdated() { + const SETTING_KEY = 'vsac_last_updated_date'; + + let sinceDate = await this.database.getSetting(SETTING_KEY); + if (!sinceDate) { + // No stored date — default to 10 days ago + const d = new Date(); + d.setDate(d.getDate() - 10); + sinceDate = d.toISOString(); + } + + let url = `/res/ValueSet/?_lastUpdated=ge${sinceDate}&_offset=0&_count=100&_elements=id,url,version,status`; + let count = 0; + let serverDate = null; + + while (url) { + console.log(`_lastUpdated scan: ${count} found so far`); + this.stats.task('VSAC Sync', `_lastUpdated scan: ${count} found`); + + const bundle = await this._fetchBundle(url); + + // Capture the server's lastUpdated from the first page + if (!serverDate && bundle.meta && bundle.meta.lastUpdated) { + serverDate = bundle.meta.lastUpdated; + } + + for (let be of bundle.entry || []) { + let vs = be.resource; + if (vs && vs.url) { + this.queue.push(vs.url); + count++; + } + } + + url = this._getNextUrl(bundle); + } + + // Store the server date for next run + if (serverDate) { + await this.database.setSetting(SETTING_KEY, serverDate); + } + + return count; + } + name() { return "VSAC"; } @@ -533,38 +593,38 @@ class VSACValueSetProvider extends AbstractValueSetProvider { const rows = await new Promise((resolve, reject) => { db.all( - `SELECT 'vs' AS kind, - url, - version, - date_first_seen AS ts, - NULL AS status, - NULL AS error_message, - NULL AS finished_at, - NULL AS total_fetched, - NULL AS total_new + `SELECT 'vs' AS kind, + url, + version, + date_first_seen AS ts, + NULL AS status, + NULL AS error_message, + NULL AS finished_at, + NULL AS total_fetched, + NULL AS total_new FROM valuesets - WHERE date_first_seen > 0 - UNION ALL - SELECT 'run' AS kind, - NULL, - NULL, - started_at AS ts, - status, - error_message, - finished_at, - total_fetched, - total_new + WHERE date_first_seen > 0 + UNION ALL + SELECT 'run' AS kind, + NULL, + NULL, + started_at AS ts, + status, + error_message, + finished_at, + total_fetched, + total_new FROM vsac_runs - ORDER BY ts DESC - LIMIT 200`, - [], - (err, rows) => err ? reject(err) : resolve(rows) + ORDER BY ts DESC + LIMIT 200`, + [], + (err, rows) => err ? reject(err) : resolve(rows) ); }); const fmt = ts => ts - ? new Date(ts * 1000).toISOString().replace('T', ' ').substring(0, 19) + ' UTC' - : '—'; + ? new Date(ts * 1000).toISOString().replace('T', ' ').substring(0, 19) + ' UTC' + : '—'; let html = '

VSAC Sync History

'; html += ''; diff --git a/tx/workers/expand.js b/tx/workers/expand.js index 28671c12..f287bfb6 100644 --- a/tx/workers/expand.js +++ b/tx/workers/expand.js @@ -1010,7 +1010,7 @@ class ValueSetExpander { cds.clear(); Extensions.checkNoModifiers(cc, 'ValueSetExpander.processCodes', 'set concept reference', vsSrc.vurl); const cctxt = await cs.locate(cc.code, this.allAltCodes); - if (cctxt && cctxt.context && (!this.params.activeOnly || !await cs.isInactive(cctxt)) && await this.passesFilters(cs, cctxt, prep, filters, 0)) { + if (cctxt && cctxt.context && (!this.params.activeOnly || !await cs.isInactive(cctxt.context)) && await this.passesFilters(cs, cctxt.context, prep, filters, 0)) { if (filter.passesDesignations(cds) || filter.passes(cc.code)) { let ov = Extensions.readString(cc, 'http://hl7.org/fhir/StructureDefinition/itemWeight'); if (!ov) {