Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
344 changes: 137 additions & 207 deletions services/10-token-engine/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,14 @@ const express = require('express');
const helmet = require('helmet');
const Decimal = require('decimal.js');
const redis = require('redis');
const jwt = require('jsonwebtoken');

const app = express();
app.use(helmet());
app.use(express.json());
const port = process.env.PORT || 3010;
const JWT_SECRET = process.env.JWT_SECRET;

app.use(helmet());

const pgClient = new Client({ connectionString: process.env.DATABASE_URL });
const kafka = new Kafka({
clientId: 'token-engine',
Expand Down Expand Up @@ -50,6 +49,28 @@ const authenticateToken = (req, res, next) => {
const LMP_THRESHOLD_SURPLUS = new Decimal(process.env.LMP_THRESHOLD_SURPLUS || '30.0');
const LMP_THRESHOLD_SCARCITY = new Decimal(process.env.LMP_THRESHOLD_SCARCITY || '100.0');

/**
* Middleware: Verify JWT token (Zero-Trust Security)
* Hardened to return 500 error if JWT_SECRET is missing.
*/
const authenticateToken = (req, res, next) => {
const authHeader = req.headers['authorization'];
const token = authHeader && authHeader.split(' ')[1];

if (!token) return res.status(401).json({ error: 'Access token required' });

if (!JWT_SECRET) {
console.error('Security Warning: JWT_SECRET is not configured.');
return res.status(500).json({ error: 'Internal server configuration error' });
}

jwt.verify(token, JWT_SECRET, (err, user) => {
if (err) return res.status(403).json({ error: 'Invalid or expired token' });
req.user = user;
next();
});
};

// --- Helper Functions for Database Interaction ---

async function getRewardRule(actionType) {
Expand Down Expand Up @@ -274,99 +295,8 @@ app.get('/data/training/rewards', authenticateToken, async (req, res) => {
}
});

// --- Asynchronous Reward Batching (L10-P3) ---

let isBatchProcessing = false;

async function processBatchMint() {
if (isBatchProcessing) return;
isBatchProcessing = true;

try {
// Atomic claiming of queued rewards: transitions status to 'processing' to prevent double-minting
const res = await pgClient.query(`
UPDATE token_reward_log
SET status = 'processing'
WHERE log_id IN (
SELECT log_id
FROM token_reward_log
WHERE status = 'queued'
ORDER BY created_at ASC
LIMIT 10
FOR UPDATE SKIP LOCKED
)
RETURNING log_id, driver_id, points_awarded;
`);

if (res.rows.length === 0) {
isBatchProcessing = false;
return;
}

console.log(`[L10 Batch] Claimed ${res.rows.length} rewards for processing...`);

for (const row of res.rows) {
const driverWallet = await getOrCreateDriverWallet(row.driver_id);
try {
const openWalletResponse = await axios.post(`${process.env.OPEN_WALLET_API_URL}/transactions`, {
walletAddress: driverWallet.open_wallet_address,
amount: parseFloat(row.points_awarded),
currency: 'MiGridPoints',
referenceId: row.log_id
});
await updateRewardTransactionStatus(row.log_id, 'complete', openWalletResponse.data.transactionId);
console.log(`✅ [L10 Batch] Reward minted for log ${row.log_id}: ${row.points_awarded} points.`);
} catch (error) {
console.error(`❌ [L10 Batch] Reward failed for log ${row.log_id}:`, error.message);
// On failure, revert status back to 'queued' or mark as 'failed' based on error type
// For now, we follow platform standard and mark as 'failed' to avoid infinite retry loops without backoff
await updateRewardTransactionStatus(row.log_id, 'failed');
}
}
} catch (error) {
console.error('[L10 Batch] Error processing batch:', error.message);
} finally {
isBatchProcessing = false;
}
}

// --- Main Application Logic ---

/**
* [L10-P3] Batch Minting Worker
* Periodically processes queued rewards to simulate gas-optimized batch transactions.
*/
async function processBatchMint() {
try {
const queuedRewards = await pgClient.query(
"SELECT log.*, dw.open_wallet_address FROM token_reward_log log JOIN driver_wallets dw ON log.driver_id = dw.driver_id WHERE log.status = 'queued' LIMIT 50;"
);

if (queuedRewards.rows.length === 0) return;

console.log(`[L10 Batch Worker] Processing ${queuedRewards.rows.length} queued rewards...`);

for (const reward of queuedRewards.rows) {
try {
// 6. Execute Blockchain/Wallet Transaction (Asynchronous Batch)
const openWalletResponse = await axios.post(`${process.env.OPEN_WALLET_API_URL}/transactions`, {
walletAddress: reward.open_wallet_address,
amount: parseFloat(reward.points_awarded),
currency: 'MiGridPoints',
referenceId: reward.log_id
});
await updateRewardTransactionStatus(reward.log_id, 'complete', openWalletResponse.data.transactionId);
console.log(`✅ [L10 Batch] Processed reward ${reward.log_id} for driver ${reward.driver_id}`);
} catch (error) {
console.error(`❌ [L10 Batch] Failed to process reward ${reward.log_id}:`, error.message);
await updateRewardTransactionStatus(reward.log_id, 'failed');
}
}
} catch (error) {
console.error('[L10 Batch Worker] Critical Error:', error.message);
}
}

async function start() {
try {
await pgClient.connect();
Expand Down Expand Up @@ -401,127 +331,127 @@ async function start() {

console.log(`⚡ Received message from ${topic}:`, payload);

const {
driver_id,
action_type,
source_value,
event_id,
iso: payloadIso,
physics_score,
physicsScore,
is_vpp_event,
isVppEvent,
is_high_fidelity,
isHighFidelity,
is_sentinel_fidelity,
isSentinelFidelity,
confidence_score,
confidenceScore,
resource_type,
resourceType,
site_id,
siteId,
location_id,
locationId
} = payload;

const vppAligned = !!(is_vpp_event || isVppEvent);

// Robust Payload Validation and Standardization (Snake_case & CamelCase support)
let physicsScoreVal = physics_score !== undefined ? parseFloat(physics_score) : (physicsScore !== undefined ? parseFloat(physicsScore) : null);
if (physicsScoreVal !== null && isNaN(physicsScoreVal)) physicsScoreVal = null;

let confidenceScoreVal = confidence_score !== undefined ? parseFloat(confidence_score) : (confidenceScore !== undefined ? parseFloat(confidenceScore) : null);
if (confidenceScoreVal !== null && isNaN(confidenceScoreVal)) confidenceScoreVal = null;

const isHighFidelityVal = is_high_fidelity !== undefined ? is_high_fidelity : (isHighFidelity !== undefined ? isHighFidelity : false);
const isSentinelFidelityVal = is_sentinel_fidelity !== undefined ? is_sentinel_fidelity : (isSentinelFidelity !== undefined ? isSentinelFidelity : false);
const siteIdVal = site_id || siteId || location_id || locationId || null;
const resourceTypeVal = resource_type || resourceType || 'EV';

// 1. Ensure Driver Wallet Exists (and get address)
const driverWallet = await getOrCreateDriverWallet(driver_id);
if (!driverWallet) {
console.error(`❌ Failed to get or create wallet for driver: ${driver_id}`);
return;
}
const iso = (payloadIso || driverWallet.iso || 'CAISO').toUpperCase().replace(/-/g, '');

let pointsAwarded = new Decimal(0);
let rule_id;
let multiplierReason = 'Standard Reward';

// Robust payload validation and parsing
let physicsScorePersist = (physicsScoreVal !== undefined && physicsScoreVal !== null) ? parseFloat(physicsScoreVal) : null;
let confidenceScorePersist = (confidenceScoreVal !== undefined && confidenceScoreVal !== null) ? parseFloat(confidenceScoreVal) : null;

if (physicsScoreVal !== undefined && isNaN(physicsScorePersist)) {
console.warn(`[L10 Audit] Received NaN physics_score for event ${event_id}. Skipping.`);
return;
}

// April 2026 Audit Standard: Explicit high-fidelity flag OR physics OR confidence > 0.95
let isHighFidelityPersist = (isHighFidelityVal === true || isHighFidelityVal === 'true') ||
(physicsScorePersist !== null && physicsScorePersist > 0.95) ||
(confidenceScorePersist !== null && confidenceScorePersist > 0.95);

// L10 v4.3.6 Sentinel Fidelity Tier: physics_score > 0.99 or explicit sentinel flag (supports boolean, string 'true', and integer 1)
let isSentinelFidelityPersist = (isSentinelFidelityVal === true || isSentinelFidelityVal === 'true' || isSentinelFidelityVal === 1) ||
(physicsScorePersist !== null && physicsScorePersist > 0.99);

// Fetch rule early for idempotency check
const rule = await getRewardRule(action_type);
const isBehavioral = action_type === 'challenge_completed' || action_type === 'achievement_unlocked' || action_type === 'grid_response';

if (!rule && !isBehavioral) {
console.warn(`⚠️ No active reward rule found for action type: ${action_type}`);
return;
}
rule_id = rule ? rule.rule_id : '00000000-0000-0000-0000-000000000000';

// 2. Idempotency Check (Fixed parameter order: driver_id, event_id, rule_id)
const existingReward = await checkIdempotency(driver_id, event_id, rule_id);
if (existingReward) {
console.log(`[L10 Idempotency] Reward already exists for ${action_type} (Event: ${event_id}). Status: ${existingReward.status}. Skipping.`);
return;
}

if (isBehavioral) {
// Fixed-value rewards (points/tokens)
pointsAwarded = new Decimal(source_value || 0);
console.log(`[L10] Behavioral ${action_type} by driver ${driver_id}. Awarding ${pointsAwarded.toNumber()} tokens. [Resource: ${resourceTypeVal}]`);
} else {
// Proof of Physics Gate: Energy-based rewards must have verified physics
if (physicsScorePersist !== null) {
const fidelityStatus = isHighFidelityPersist ? 'HIGH_FIDELITY' : 'STANDARD';

if (physicsScorePersist <= 0.0) {
console.warn(`[L10 Audit] [${fidelityStatus}] Rejected reward for event ${event_id}: Physics Score too low (${physicsScorePersist}). Driver: ${driver_id} [Resource: ${resourceTypeVal}]`);
return;
}
} else {
console.warn(`[L10 Audit] Rejected energy-based reward for event ${event_id}: Physics Score missing. Driver: ${driver_id} [Resource: ${resourceTypeVal}]`);
const {
driver_id,
action_type,
source_value,
event_id,
iso: payloadIso,
physics_score,
physicsScore,
is_vpp_event,
isVppEvent,
is_high_fidelity,
isHighFidelity,
is_sentinel_fidelity,
isSentinelFidelity,
confidence_score,
confidenceScore,
resource_type,
resourceType,
site_id,
siteId,
location_id,
locationId
} = payload;

const vppAligned = !!(is_vpp_event || isVppEvent);

// Robust Payload Validation and Standardization (Snake_case & CamelCase support)
let physicsScoreVal = physics_score !== undefined ? parseFloat(physics_score) : (physicsScore !== undefined ? parseFloat(physicsScore) : null);
if (physicsScoreVal !== null && isNaN(physicsScoreVal)) physicsScoreVal = null;

let confidenceScoreVal = confidence_score !== undefined ? parseFloat(confidence_score) : (confidenceScore !== undefined ? parseFloat(confidenceScore) : null);
if (confidenceScoreVal !== null && isNaN(confidenceScoreVal)) confidenceScoreVal = null;

const isHighFidelityVal = is_high_fidelity !== undefined ? is_high_fidelity : (isHighFidelity !== undefined ? isHighFidelity : false);
const isSentinelFidelityVal = is_sentinel_fidelity !== undefined ? is_sentinel_fidelity : (isSentinelFidelity !== undefined ? isSentinelFidelity : false);
const siteIdVal = site_id || siteId || location_id || locationId || null;
const resourceTypeVal = resource_type || resourceType || 'EV';

// 1. Ensure Driver Wallet Exists (and get address)
const driverWallet = await getOrCreateDriverWallet(driver_id);
if (!driverWallet) {
console.error(`❌ Failed to get or create wallet for driver: ${driver_id}`);
return;
}
const iso = (payloadIso || driverWallet.iso || 'CAISO').toUpperCase().replace(/-/g, '');

let pointsAwarded = new Decimal(0);
let rule_id;
let multiplierReason = 'Standard Reward';

// Robust payload validation and parsing
let physicsScorePersist = (physicsScoreVal !== undefined && physicsScoreVal !== null) ? parseFloat(physicsScoreVal) : null;
let confidenceScorePersist = (confidenceScoreVal !== undefined && confidenceScoreVal !== null) ? parseFloat(confidenceScoreVal) : null;

if (physicsScoreVal !== undefined && isNaN(physicsScorePersist)) {
console.warn(`[L10 Audit] Received NaN physics_score for event ${event_id}. Skipping.`);
return;
}

// April 2026 Audit Standard: Explicit high-fidelity flag OR physics OR confidence > 0.95
let isHighFidelityPersist = (isHighFidelityVal === true || isHighFidelityVal === 'true') ||
(physicsScorePersist !== null && physicsScorePersist > 0.95) ||
(confidenceScorePersist !== null && confidenceScorePersist > 0.95);

// L10 v4.3.6 Sentinel Fidelity Tier: physics_score > 0.99 or explicit sentinel flag (supports boolean, string 'true', and integer 1)
let isSentinelFidelityPersist = (isSentinelFidelityVal === true || isSentinelFidelityVal === 'true' || isSentinelFidelityVal === 1) ||
(physicsScorePersist !== null && physicsScorePersist > 0.99);

// Fetch rule early for idempotency check
const rule = await getRewardRule(action_type);
const isBehavioral = action_type === 'challenge_completed' || action_type === 'achievement_unlocked' || action_type === 'grid_response';

if (!rule && !isBehavioral) {
console.warn(`⚠️ No active reward rule found for action type: ${action_type}`);
return;
}
rule_id = rule ? rule.rule_id : '00000000-0000-0000-0000-000000000000';

// 2. Calculate Reward with Dynamic Boosting (Energy-based)
const marketMultiplier = await getDynamicMultiplier(iso, action_type, vppAligned);
const siteMultiplier = await getSiteMultiplier(siteIdVal);
// 2. Idempotency Check (Fixed parameter order: driver_id, event_id, rule_id)
const existingReward = await checkIdempotency(driver_id, event_id, rule_id);
if (existingReward) {
console.log(`[L10 Idempotency] Reward already exists for ${action_type} (Event: ${event_id}). Status: ${existingReward.status}. Skipping.`);
return;
}

// Compound Multipliers
const totalMultiplier = marketMultiplier.multiplier.times(siteMultiplier.multiplier);
multiplierReason = marketMultiplier.multiplier.eq(1.0) ? siteMultiplier.reason : `${marketMultiplier.reason} + ${siteMultiplier.reason}`;
if (isBehavioral) {
// Fixed-value rewards (points/tokens)
pointsAwarded = new Decimal(source_value || 0);
console.log(`[L10] Behavioral ${action_type} by driver ${driver_id}. Awarding ${pointsAwarded.toNumber()} tokens. [Resource: ${resourceTypeVal}]`);
} else {
// Proof of Physics Gate: Energy-based rewards must have verified physics
if (physicsScorePersist !== null) {
const fidelityStatus = isHighFidelityPersist ? 'HIGH_FIDELITY' : 'STANDARD';

if (physicsScorePersist <= 0.0) {
console.warn(`[L10 Audit] [${fidelityStatus}] Rejected reward for event ${event_id}: Physics Score too low (${physicsScorePersist}). Driver: ${driver_id} [Resource: ${resourceTypeVal}]`);
return;
}
} else {
console.warn(`[L10 Audit] Rejected energy-based reward for event ${event_id}: Physics Score missing. Driver: ${driver_id} [Resource: ${resourceTypeVal}]`);
return;
}

// 2. Calculate Reward with Dynamic Boosting (Energy-based)
const marketMultiplier = await getDynamicMultiplier(iso, action_type, vppAligned);
const siteMultiplier = await getSiteMultiplier(siteIdVal);

// Compound Multipliers
const totalMultiplier = marketMultiplier.multiplier.times(siteMultiplier.multiplier);
multiplierReason = marketMultiplier.multiplier.eq(1.0) ? siteMultiplier.reason : `${marketMultiplier.reason} + ${siteMultiplier.reason}`;

const baseReward = new Decimal(source_value || 0).times(rule.reward_multiplier);
pointsAwarded = baseReward.times(totalMultiplier).toDecimalPlaces(8);
const baseReward = new Decimal(source_value || 0).times(rule.reward_multiplier);
pointsAwarded = baseReward.times(totalMultiplier).toDecimalPlaces(8);

console.log(`[L10] Reward calculated: ${pointsAwarded.toNumber()} points (Source: ${source_value}, Rule Mult: ${rule.reward_multiplier}, Total Mult: ${totalMultiplier.toNumber()})`);
}
console.log(`[L10] Reward calculated: ${pointsAwarded.toNumber()} points (Source: ${source_value}, Rule Mult: ${rule.reward_multiplier}, Total Mult: ${totalMultiplier.toNumber()})`);
}

if (pointsAwarded.isZero()) {
console.log(`[L10] Reward is zero for event ${event_id}, skipping.`);
return;
}
if (pointsAwarded.isZero()) {
console.log(`[L10] Reward is zero for event ${event_id}, skipping.`);
return;
}

// 4. Log the Reward (queued for batch minting)
await logRewardTransaction(
Expand Down
1 change: 1 addition & 0 deletions services/10-token-engine/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"decimal.js": "^10.4.3",
"express": "^4.18.2",
"helmet": "^8.1.0",
"jsonwebtoken": "^9.0.3",
"kafkajs": "^2.2.4",
"pg": "^8.11.0",
"redis": "^4.6.10"
Expand Down