diff --git a/services/10-token-engine/index.js b/services/10-token-engine/index.js index 7a48b618b..7fd04d6e8 100644 --- a/services/10-token-engine/index.js +++ b/services/10-token-engine/index.js @@ -5,6 +5,7 @@ const express = require('express'); const helmet = require('helmet'); const Decimal = require('decimal.js'); const redis = require('redis'); +const jwt = require('jsonwebtoken'); const app = express(); app.use(helmet()); @@ -12,8 +13,6 @@ app.use(express.json()); const port = process.env.PORT || 3010; const JWT_SECRET = process.env.JWT_SECRET; -app.use(helmet()); - const pgClient = new Client({ connectionString: process.env.DATABASE_URL }); const kafka = new Kafka({ clientId: 'token-engine', @@ -50,6 +49,28 @@ const authenticateToken = (req, res, next) => { const LMP_THRESHOLD_SURPLUS = new Decimal(process.env.LMP_THRESHOLD_SURPLUS || '30.0'); const LMP_THRESHOLD_SCARCITY = new Decimal(process.env.LMP_THRESHOLD_SCARCITY || '100.0'); +/** + * Middleware: Verify JWT token (Zero-Trust Security) + * Hardened to return 500 error if JWT_SECRET is missing. + */ +const authenticateToken = (req, res, next) => { + const authHeader = req.headers['authorization']; + const token = authHeader && authHeader.split(' ')[1]; + + if (!token) return res.status(401).json({ error: 'Access token required' }); + + if (!JWT_SECRET) { + console.error('Security Warning: JWT_SECRET is not configured.'); + return res.status(500).json({ error: 'Internal server configuration error' }); + } + + jwt.verify(token, JWT_SECRET, (err, user) => { + if (err) return res.status(403).json({ error: 'Invalid or expired token' }); + req.user = user; + next(); + }); +}; + // --- Helper Functions for Database Interaction --- async function getRewardRule(actionType) { @@ -274,99 +295,8 @@ app.get('/data/training/rewards', authenticateToken, async (req, res) => { } }); -// --- Asynchronous Reward Batching (L10-P3) --- - -let isBatchProcessing = false; - -async function processBatchMint() { - if (isBatchProcessing) return; - isBatchProcessing = true; - - try { - // Atomic claiming of queued rewards: transitions status to 'processing' to prevent double-minting - const res = await pgClient.query(` - UPDATE token_reward_log - SET status = 'processing' - WHERE log_id IN ( - SELECT log_id - FROM token_reward_log - WHERE status = 'queued' - ORDER BY created_at ASC - LIMIT 10 - FOR UPDATE SKIP LOCKED - ) - RETURNING log_id, driver_id, points_awarded; - `); - - if (res.rows.length === 0) { - isBatchProcessing = false; - return; - } - - console.log(`[L10 Batch] Claimed ${res.rows.length} rewards for processing...`); - - for (const row of res.rows) { - const driverWallet = await getOrCreateDriverWallet(row.driver_id); - try { - const openWalletResponse = await axios.post(`${process.env.OPEN_WALLET_API_URL}/transactions`, { - walletAddress: driverWallet.open_wallet_address, - amount: parseFloat(row.points_awarded), - currency: 'MiGridPoints', - referenceId: row.log_id - }); - await updateRewardTransactionStatus(row.log_id, 'complete', openWalletResponse.data.transactionId); - console.log(`✅ [L10 Batch] Reward minted for log ${row.log_id}: ${row.points_awarded} points.`); - } catch (error) { - console.error(`❌ [L10 Batch] Reward failed for log ${row.log_id}:`, error.message); - // On failure, revert status back to 'queued' or mark as 'failed' based on error type - // For now, we follow platform standard and mark as 'failed' to avoid infinite retry loops without backoff - await updateRewardTransactionStatus(row.log_id, 'failed'); - } - } - } catch (error) { - console.error('[L10 Batch] Error processing batch:', error.message); - } finally { - isBatchProcessing = false; - } -} - // --- Main Application Logic --- -/** - * [L10-P3] Batch Minting Worker - * Periodically processes queued rewards to simulate gas-optimized batch transactions. - */ -async function processBatchMint() { - try { - const queuedRewards = await pgClient.query( - "SELECT log.*, dw.open_wallet_address FROM token_reward_log log JOIN driver_wallets dw ON log.driver_id = dw.driver_id WHERE log.status = 'queued' LIMIT 50;" - ); - - if (queuedRewards.rows.length === 0) return; - - console.log(`[L10 Batch Worker] Processing ${queuedRewards.rows.length} queued rewards...`); - - for (const reward of queuedRewards.rows) { - try { - // 6. Execute Blockchain/Wallet Transaction (Asynchronous Batch) - const openWalletResponse = await axios.post(`${process.env.OPEN_WALLET_API_URL}/transactions`, { - walletAddress: reward.open_wallet_address, - amount: parseFloat(reward.points_awarded), - currency: 'MiGridPoints', - referenceId: reward.log_id - }); - await updateRewardTransactionStatus(reward.log_id, 'complete', openWalletResponse.data.transactionId); - console.log(`✅ [L10 Batch] Processed reward ${reward.log_id} for driver ${reward.driver_id}`); - } catch (error) { - console.error(`❌ [L10 Batch] Failed to process reward ${reward.log_id}:`, error.message); - await updateRewardTransactionStatus(reward.log_id, 'failed'); - } - } - } catch (error) { - console.error('[L10 Batch Worker] Critical Error:', error.message); - } -} - async function start() { try { await pgClient.connect(); @@ -401,127 +331,127 @@ async function start() { console.log(`⚡ Received message from ${topic}:`, payload); - const { - driver_id, - action_type, - source_value, - event_id, - iso: payloadIso, - physics_score, - physicsScore, - is_vpp_event, - isVppEvent, - is_high_fidelity, - isHighFidelity, - is_sentinel_fidelity, - isSentinelFidelity, - confidence_score, - confidenceScore, - resource_type, - resourceType, - site_id, - siteId, - location_id, - locationId - } = payload; - - const vppAligned = !!(is_vpp_event || isVppEvent); - - // Robust Payload Validation and Standardization (Snake_case & CamelCase support) - let physicsScoreVal = physics_score !== undefined ? parseFloat(physics_score) : (physicsScore !== undefined ? parseFloat(physicsScore) : null); - if (physicsScoreVal !== null && isNaN(physicsScoreVal)) physicsScoreVal = null; - - let confidenceScoreVal = confidence_score !== undefined ? parseFloat(confidence_score) : (confidenceScore !== undefined ? parseFloat(confidenceScore) : null); - if (confidenceScoreVal !== null && isNaN(confidenceScoreVal)) confidenceScoreVal = null; - - const isHighFidelityVal = is_high_fidelity !== undefined ? is_high_fidelity : (isHighFidelity !== undefined ? isHighFidelity : false); - const isSentinelFidelityVal = is_sentinel_fidelity !== undefined ? is_sentinel_fidelity : (isSentinelFidelity !== undefined ? isSentinelFidelity : false); - const siteIdVal = site_id || siteId || location_id || locationId || null; - const resourceTypeVal = resource_type || resourceType || 'EV'; - - // 1. Ensure Driver Wallet Exists (and get address) - const driverWallet = await getOrCreateDriverWallet(driver_id); - if (!driverWallet) { - console.error(`❌ Failed to get or create wallet for driver: ${driver_id}`); - return; - } - const iso = (payloadIso || driverWallet.iso || 'CAISO').toUpperCase().replace(/-/g, ''); - - let pointsAwarded = new Decimal(0); - let rule_id; - let multiplierReason = 'Standard Reward'; - - // Robust payload validation and parsing - let physicsScorePersist = (physicsScoreVal !== undefined && physicsScoreVal !== null) ? parseFloat(physicsScoreVal) : null; - let confidenceScorePersist = (confidenceScoreVal !== undefined && confidenceScoreVal !== null) ? parseFloat(confidenceScoreVal) : null; - - if (physicsScoreVal !== undefined && isNaN(physicsScorePersist)) { - console.warn(`[L10 Audit] Received NaN physics_score for event ${event_id}. Skipping.`); - return; - } - - // April 2026 Audit Standard: Explicit high-fidelity flag OR physics OR confidence > 0.95 - let isHighFidelityPersist = (isHighFidelityVal === true || isHighFidelityVal === 'true') || - (physicsScorePersist !== null && physicsScorePersist > 0.95) || - (confidenceScorePersist !== null && confidenceScorePersist > 0.95); - - // L10 v4.3.6 Sentinel Fidelity Tier: physics_score > 0.99 or explicit sentinel flag (supports boolean, string 'true', and integer 1) - let isSentinelFidelityPersist = (isSentinelFidelityVal === true || isSentinelFidelityVal === 'true' || isSentinelFidelityVal === 1) || - (physicsScorePersist !== null && physicsScorePersist > 0.99); - - // Fetch rule early for idempotency check - const rule = await getRewardRule(action_type); - const isBehavioral = action_type === 'challenge_completed' || action_type === 'achievement_unlocked' || action_type === 'grid_response'; - - if (!rule && !isBehavioral) { - console.warn(`⚠️ No active reward rule found for action type: ${action_type}`); - return; - } - rule_id = rule ? rule.rule_id : '00000000-0000-0000-0000-000000000000'; - - // 2. Idempotency Check (Fixed parameter order: driver_id, event_id, rule_id) - const existingReward = await checkIdempotency(driver_id, event_id, rule_id); - if (existingReward) { - console.log(`[L10 Idempotency] Reward already exists for ${action_type} (Event: ${event_id}). Status: ${existingReward.status}. Skipping.`); - return; - } - - if (isBehavioral) { - // Fixed-value rewards (points/tokens) - pointsAwarded = new Decimal(source_value || 0); - console.log(`[L10] Behavioral ${action_type} by driver ${driver_id}. Awarding ${pointsAwarded.toNumber()} tokens. [Resource: ${resourceTypeVal}]`); - } else { - // Proof of Physics Gate: Energy-based rewards must have verified physics - if (physicsScorePersist !== null) { - const fidelityStatus = isHighFidelityPersist ? 'HIGH_FIDELITY' : 'STANDARD'; - - if (physicsScorePersist <= 0.0) { - console.warn(`[L10 Audit] [${fidelityStatus}] Rejected reward for event ${event_id}: Physics Score too low (${physicsScorePersist}). Driver: ${driver_id} [Resource: ${resourceTypeVal}]`); - return; - } - } else { - console.warn(`[L10 Audit] Rejected energy-based reward for event ${event_id}: Physics Score missing. Driver: ${driver_id} [Resource: ${resourceTypeVal}]`); + const { + driver_id, + action_type, + source_value, + event_id, + iso: payloadIso, + physics_score, + physicsScore, + is_vpp_event, + isVppEvent, + is_high_fidelity, + isHighFidelity, + is_sentinel_fidelity, + isSentinelFidelity, + confidence_score, + confidenceScore, + resource_type, + resourceType, + site_id, + siteId, + location_id, + locationId + } = payload; + + const vppAligned = !!(is_vpp_event || isVppEvent); + + // Robust Payload Validation and Standardization (Snake_case & CamelCase support) + let physicsScoreVal = physics_score !== undefined ? parseFloat(physics_score) : (physicsScore !== undefined ? parseFloat(physicsScore) : null); + if (physicsScoreVal !== null && isNaN(physicsScoreVal)) physicsScoreVal = null; + + let confidenceScoreVal = confidence_score !== undefined ? parseFloat(confidence_score) : (confidenceScore !== undefined ? parseFloat(confidenceScore) : null); + if (confidenceScoreVal !== null && isNaN(confidenceScoreVal)) confidenceScoreVal = null; + + const isHighFidelityVal = is_high_fidelity !== undefined ? is_high_fidelity : (isHighFidelity !== undefined ? isHighFidelity : false); + const isSentinelFidelityVal = is_sentinel_fidelity !== undefined ? is_sentinel_fidelity : (isSentinelFidelity !== undefined ? isSentinelFidelity : false); + const siteIdVal = site_id || siteId || location_id || locationId || null; + const resourceTypeVal = resource_type || resourceType || 'EV'; + + // 1. Ensure Driver Wallet Exists (and get address) + const driverWallet = await getOrCreateDriverWallet(driver_id); + if (!driverWallet) { + console.error(`❌ Failed to get or create wallet for driver: ${driver_id}`); + return; + } + const iso = (payloadIso || driverWallet.iso || 'CAISO').toUpperCase().replace(/-/g, ''); + + let pointsAwarded = new Decimal(0); + let rule_id; + let multiplierReason = 'Standard Reward'; + + // Robust payload validation and parsing + let physicsScorePersist = (physicsScoreVal !== undefined && physicsScoreVal !== null) ? parseFloat(physicsScoreVal) : null; + let confidenceScorePersist = (confidenceScoreVal !== undefined && confidenceScoreVal !== null) ? parseFloat(confidenceScoreVal) : null; + + if (physicsScoreVal !== undefined && isNaN(physicsScorePersist)) { + console.warn(`[L10 Audit] Received NaN physics_score for event ${event_id}. Skipping.`); + return; + } + + // April 2026 Audit Standard: Explicit high-fidelity flag OR physics OR confidence > 0.95 + let isHighFidelityPersist = (isHighFidelityVal === true || isHighFidelityVal === 'true') || + (physicsScorePersist !== null && physicsScorePersist > 0.95) || + (confidenceScorePersist !== null && confidenceScorePersist > 0.95); + + // L10 v4.3.6 Sentinel Fidelity Tier: physics_score > 0.99 or explicit sentinel flag (supports boolean, string 'true', and integer 1) + let isSentinelFidelityPersist = (isSentinelFidelityVal === true || isSentinelFidelityVal === 'true' || isSentinelFidelityVal === 1) || + (physicsScorePersist !== null && physicsScorePersist > 0.99); + + // Fetch rule early for idempotency check + const rule = await getRewardRule(action_type); + const isBehavioral = action_type === 'challenge_completed' || action_type === 'achievement_unlocked' || action_type === 'grid_response'; + + if (!rule && !isBehavioral) { + console.warn(`⚠️ No active reward rule found for action type: ${action_type}`); return; } + rule_id = rule ? rule.rule_id : '00000000-0000-0000-0000-000000000000'; - // 2. Calculate Reward with Dynamic Boosting (Energy-based) - const marketMultiplier = await getDynamicMultiplier(iso, action_type, vppAligned); - const siteMultiplier = await getSiteMultiplier(siteIdVal); + // 2. Idempotency Check (Fixed parameter order: driver_id, event_id, rule_id) + const existingReward = await checkIdempotency(driver_id, event_id, rule_id); + if (existingReward) { + console.log(`[L10 Idempotency] Reward already exists for ${action_type} (Event: ${event_id}). Status: ${existingReward.status}. Skipping.`); + return; + } - // Compound Multipliers - const totalMultiplier = marketMultiplier.multiplier.times(siteMultiplier.multiplier); - multiplierReason = marketMultiplier.multiplier.eq(1.0) ? siteMultiplier.reason : `${marketMultiplier.reason} + ${siteMultiplier.reason}`; + if (isBehavioral) { + // Fixed-value rewards (points/tokens) + pointsAwarded = new Decimal(source_value || 0); + console.log(`[L10] Behavioral ${action_type} by driver ${driver_id}. Awarding ${pointsAwarded.toNumber()} tokens. [Resource: ${resourceTypeVal}]`); + } else { + // Proof of Physics Gate: Energy-based rewards must have verified physics + if (physicsScorePersist !== null) { + const fidelityStatus = isHighFidelityPersist ? 'HIGH_FIDELITY' : 'STANDARD'; + + if (physicsScorePersist <= 0.0) { + console.warn(`[L10 Audit] [${fidelityStatus}] Rejected reward for event ${event_id}: Physics Score too low (${physicsScorePersist}). Driver: ${driver_id} [Resource: ${resourceTypeVal}]`); + return; + } + } else { + console.warn(`[L10 Audit] Rejected energy-based reward for event ${event_id}: Physics Score missing. Driver: ${driver_id} [Resource: ${resourceTypeVal}]`); + return; + } + + // 2. Calculate Reward with Dynamic Boosting (Energy-based) + const marketMultiplier = await getDynamicMultiplier(iso, action_type, vppAligned); + const siteMultiplier = await getSiteMultiplier(siteIdVal); + + // Compound Multipliers + const totalMultiplier = marketMultiplier.multiplier.times(siteMultiplier.multiplier); + multiplierReason = marketMultiplier.multiplier.eq(1.0) ? siteMultiplier.reason : `${marketMultiplier.reason} + ${siteMultiplier.reason}`; - const baseReward = new Decimal(source_value || 0).times(rule.reward_multiplier); - pointsAwarded = baseReward.times(totalMultiplier).toDecimalPlaces(8); + const baseReward = new Decimal(source_value || 0).times(rule.reward_multiplier); + pointsAwarded = baseReward.times(totalMultiplier).toDecimalPlaces(8); - console.log(`[L10] Reward calculated: ${pointsAwarded.toNumber()} points (Source: ${source_value}, Rule Mult: ${rule.reward_multiplier}, Total Mult: ${totalMultiplier.toNumber()})`); - } + console.log(`[L10] Reward calculated: ${pointsAwarded.toNumber()} points (Source: ${source_value}, Rule Mult: ${rule.reward_multiplier}, Total Mult: ${totalMultiplier.toNumber()})`); + } - if (pointsAwarded.isZero()) { - console.log(`[L10] Reward is zero for event ${event_id}, skipping.`); - return; - } + if (pointsAwarded.isZero()) { + console.log(`[L10] Reward is zero for event ${event_id}, skipping.`); + return; + } // 4. Log the Reward (queued for batch minting) await logRewardTransaction( diff --git a/services/10-token-engine/package.json b/services/10-token-engine/package.json index cc6c91920..6f5e6d282 100644 --- a/services/10-token-engine/package.json +++ b/services/10-token-engine/package.json @@ -7,6 +7,7 @@ "decimal.js": "^10.4.3", "express": "^4.18.2", "helmet": "^8.1.0", + "jsonwebtoken": "^9.0.3", "kafkajs": "^2.2.4", "pg": "^8.11.0", "redis": "^4.6.10"