Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion packages/bitcore-node/src/models/baseTransaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ export interface ITransaction {
chain: string;
network: string;
blockHeight?: number;
_block?: ObjectID;
blockHash?: string;
blockTime?: Date;
blockTimeNormalized?: Date;
Expand All @@ -32,7 +33,7 @@ export abstract class BaseTransaction<T extends ITransaction> extends BaseModel<
onConnect() {
this.collection.createIndex({ txid: 1 }, { background: true });
this.collection.createIndex({ chain: 1, network: 1, blockHeight: 1 }, { background: true });
this.collection.createIndex({ blockHash: 1 }, { background: true });
this.collection.createIndex({ _block: 1 }, { background: true });
this.collection.createIndex({ chain: 1, network: 1, blockTimeNormalized: 1 }, { background: true });
this.collection.createIndex(
{ wallets: 1, blockTimeNormalized: 1 },
Expand Down
3 changes: 2 additions & 1 deletion packages/bitcore-node/src/models/block.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ export class BitcoinBlock extends BaseBlock<IBtcBlock> {

const previousBlock = await this.collection.findOne({ hash: convertedBlock.previousBlockHash, chain, network });

await this.collection.bulkWrite([blockOp]);
const bulkInsert = await this.collection.bulkWrite([blockOp]);
if (previousBlock) {
await this.collection.updateOne(
{ chain, network, hash: previousBlock.hash },
Expand All @@ -68,6 +68,7 @@ export class BitcoinBlock extends BaseBlock<IBtcBlock> {

await TransactionStorage.batchImport({
txs: block.transactions,
_block: bulkInsert.upsertedIds[0],
blockHash: convertedBlock.hash,
blockTime: new Date(time),
blockTimeNormalized: new Date(timeNormalized),
Expand Down
10 changes: 6 additions & 4 deletions packages/bitcore-node/src/models/coin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ export interface ICoin {
network: string;
chain: string;
mintTxid: string;
_mintTx: ObjectID;
mintIndex: number;
mintHeight: number;
coinbase: boolean;
Expand All @@ -18,6 +19,7 @@ export interface ICoin {
script: Buffer;
wallets: Array<ObjectID>;
spentTxid: string;
_spentTx?: ObjectID;
spentHeight: number;
confirmations?: number;
sequenceNumber?: number;
Expand All @@ -35,7 +37,7 @@ export class CoinModel extends BaseModel<ICoin> {
];

onConnect() {
this.collection.createIndex({ mintTxid: 1, mintIndex: 1 }, { background: true });
this.collection.createIndex({ _mintTx: 1, mintIndex: 1 }, { background: true });
this.collection.createIndex(
{ address: 1, chain: 1, network: 1 },
{
Expand All @@ -47,18 +49,18 @@ export class CoinModel extends BaseModel<ICoin> {
);
this.collection.createIndex({ address: 1 }, { background: true });
this.collection.createIndex({ chain: 1, network: 1, mintHeight: 1 }, { background: true });
this.collection.createIndex({ spentTxid: 1 }, { background: true, sparse: true });
this.collection.createIndex({ _spentTx: 1 }, { background: true, sparse: true });
this.collection.createIndex({ chain: 1, network: 1, spentHeight: 1 }, { background: true });
this.collection.createIndex(
{ wallets: 1, spentHeight: 1, value: 1, mintHeight: 1 },
{ background: true, partialFilterExpression: { 'wallets.0': { $exists: true } } }
);
this.collection.createIndex(
{ wallets: 1, spentTxid: 1 },
{ wallets: 1, _spentTx: 1 },
{ background: true, partialFilterExpression: { 'wallets.0': { $exists: true } } }
);
this.collection.createIndex(
{ wallets: 1, mintTxid: 1 },
{ wallets: 1, _mintTx: 1 },
{ background: true, partialFilterExpression: { 'wallets.0': { $exists: true } } }
);
}
Expand Down
101 changes: 61 additions & 40 deletions packages/bitcore-node/src/models/transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ export type TaggedBitcoinTx = BitcoinTransaction & { wallets: Array<ObjectID> };
export interface MintOp {
updateOne: {
filter: {
mintTxid: string;
_mintTx: ObjectID;
mintIndex: number;
chain: string;
network: string;
Expand All @@ -54,6 +54,7 @@ export interface MintOp {
value: number;
script: Buffer;
spentTxid?: string;
_spentTx?: ObjectID;
spentHeight?: SpentHeightIndicators;
wallets?: Array<ObjectID>;
};
Expand All @@ -70,13 +71,13 @@ export interface MintOp {
export interface SpendOp {
updateOne: {
filter: {
mintTxid: string;
_mintTx: ObjectID;
mintIndex: number;
spentHeight: { $lt: SpentHeightIndicators };
chain: string;
network: string;
};
update: { $set: { spentTxid: string; spentHeight: number } };
update: { $set: { _spentTx: ObjectID; spentTxid: string; spentHeight: number } };
};
}

Expand All @@ -89,6 +90,7 @@ export interface TxOp {
network: string;
blockHeight: number;
blockHash?: string;
_block?: ObjectID;
blockTime?: Date;
blockTimeNormalized?: Date;
coinbase: boolean;
Expand All @@ -101,7 +103,7 @@ export interface TxOp {
wallets: Array<ObjectID>;
mempoolTime?: Date;
};
$setOnInsert?: TxOp['updateOne']['update']['$set'];
$setOnInsert?: Partial<TxOp['updateOne']['update']['$set'] & { _id: ObjectID }>;
};
upsert: true;
forceServerObjectId: true;
Expand Down Expand Up @@ -205,6 +207,7 @@ export class TransactionModel extends BaseTransaction<IBtcTransaction> {
height: number;
mempoolTime?: Date;
blockTime?: Date;
_block?: ObjectID;
blockHash?: string;
blockTimeNormalized?: Date;
parentChain?: string;
Expand Down Expand Up @@ -238,7 +241,7 @@ export class TransactionModel extends BaseTransaction<IBtcTransaction> {
.on('finish', r)
);

this.streamSpendOps({ ...params, spentStream });
await this.streamSpendOps({ ...params, spentStream });
await new Promise(r =>
spentStream
.pipe(new PruneMempoolStream(chain, network, initialSyncComplete))
Expand All @@ -257,10 +260,11 @@ export class TransactionModel extends BaseTransaction<IBtcTransaction> {
}

async streamTxOps(params: {
txs: Array<TaggedBitcoinTx>;
txs: Array<MongoBound<TaggedBitcoinTx>>;
height: number;
blockTime?: Date;
blockHash?: string;
_block?: ObjectID;
blockTimeNormalized?: Date;
parentChain?: string;
forkHeight?: number;
Expand All @@ -271,6 +275,7 @@ export class TransactionModel extends BaseTransaction<IBtcTransaction> {
txStream: Readable;
}) {
let {
_block,
blockHash,
blockTime,
blockTimeNormalized,
Expand Down Expand Up @@ -368,6 +373,7 @@ export class TransactionModel extends BaseTransaction<IBtcTransaction> {
network,
blockHeight: height,
blockHash,
_block,
blockTime,
blockTimeNormalized,
coinbase: tx.isCoinbase(),
Expand All @@ -379,6 +385,9 @@ export class TransactionModel extends BaseTransaction<IBtcTransaction> {
value: tx.outputAmount,
wallets,
...(mempoolTime && { mempoolTime })
},
$setOnInsert: {
_id: tx._id!
}
},
upsert: true,
Expand Down Expand Up @@ -442,8 +451,8 @@ export class TransactionModel extends BaseTransaction<IBtcTransaction> {
}
}

for (let tx of params.txs as Array<TaggedBitcoinTx>) {
const coinsForTx = mintBatch.filter(mint => mint.updateOne.filter.mintTxid === tx._hash!);
for (let tx of params.txs as Array<MongoBound<TaggedBitcoinTx>>) {
const coinsForTx = mintBatch.filter(mint => mint.updateOne.filter._mintTx === tx._id!);
tx.wallets = coinsForTx.reduce((wallets, c) => {
wallets = wallets.concat(c.updateOne.update.$set.wallets!);
return wallets;
Expand All @@ -454,7 +463,7 @@ export class TransactionModel extends BaseTransaction<IBtcTransaction> {
}

async streamMintOps(params: {
txs: Array<BitcoinTransaction>;
txs: Array<MongoBound<BitcoinTransaction>>;
height: number;
parentChain?: string;
forkHeight?: number;
Expand Down Expand Up @@ -482,6 +491,7 @@ export class TransactionModel extends BaseTransaction<IBtcTransaction> {
let mintBatch = new Array<MintOp>();
for (let tx of params.txs) {
tx._hash = tx.hash;
tx._id = new ObjectID();
let isCoinbase = tx.isCoinbase();
for (let [index, output] of tx.outputs.entries()) {
if (
Expand All @@ -505,7 +515,7 @@ export class TransactionModel extends BaseTransaction<IBtcTransaction> {
mintBatch.push({
updateOne: {
filter: {
mintTxid: tx._hash,
_mintTx: tx._id,
mintIndex: index,
chain,
network
Expand Down Expand Up @@ -544,42 +554,52 @@ export class TransactionModel extends BaseTransaction<IBtcTransaction> {
mintBatch = new Array<MintOp>();
}

streamSpendOps(params: {
txs: Array<BitcoinTransaction>;
async streamSpendOps(params: {
txs: Array<MongoBound<BitcoinTransaction>>;
height: number;
parentChain?: string;
forkHeight?: number;
chain: string;
network: string;
spentStream: Readable;
}) {
let { chain, network, height, parentChain, forkHeight } = params;
let { chain, network, height, parentChain, forkHeight, txs } = params;
if (parentChain && forkHeight && height < forkHeight) {
params.spentStream.push(null);
return;
}
let spendOpsBatch = new Array<SpendOp>();
for (let tx of params.txs) {
for (let tx of txs) {
if (tx.isCoinbase()) {
continue;
}
for (let input of tx.inputs) {
let inputObj = input.toObject();
const updateQuery = {
updateOne: {
filter: {
mintTxid: inputObj.prevTxId,
mintIndex: inputObj.outputIndex,
spentHeight: { $lt: SpentHeightIndicators.minimum },
chain,
network
},
update: {
$set: { spentTxid: tx._hash || tx.hash, spentHeight: height, sequenceNumber: inputObj.sequenceNumber }
const spentTxid = inputObj.prevTxId;
const found = txs.find(t => t._hash === spentTxid);
const spentTx = found || (await TransactionStorage.collection.findOne({ chain, network, txid: spentTxid }));
if (spentTx) {
const updateQuery = {
updateOne: {
filter: {
_mintTx: spentTx!._id!,
mintIndex: inputObj.outputIndex,
spentHeight: { $lt: SpentHeightIndicators.minimum },
chain,
network
},
update: {
$set: {
_spentTx: tx._id!,
spentTxid: tx._hash || tx.hash,
spentHeight: height,
sequenceNumber: inputObj.sequenceNumber
}
}
}
}
};
spendOpsBatch.push(updateQuery);
};
spendOpsBatch.push(updateQuery);
}
}
if (spendOpsBatch.length > MAX_BATCH_SIZE) {
params.spentStream.push(spendOpsBatch);
Expand All @@ -593,19 +613,20 @@ export class TransactionModel extends BaseTransaction<IBtcTransaction> {
spendOpsBatch = new Array<SpendOp>();
}

async findAllRelatedOutputs(forTx: string) {
async findAllRelatedOutputs(forTx: ObjectID) {
const seen = {};
const getOutputs = (txid: string) =>
CoinStorage.collection.find({ mintTxid: txid, mintHeight: { $ne: -3 } }).toArray();
const getOutputs = async (txid: ObjectID) => {
return CoinStorage.collection.find({ _mintTx: txid, mintHeight: { $ne: -3 } }).toArray();
};
let batch = await getOutputs(forTx);
let allRelatedCoins = new Array<ICoin>();
while (batch.length) {
allRelatedCoins = allRelatedCoins.concat(batch);
let newBatch = new Array<ICoin>();
for (const coin of batch) {
seen[coin.mintTxid] = true;
if (coin.spentTxid && !seen[coin.spentTxid]) {
const outputs = await getOutputs(coin.spentTxid);
if (coin._spentTx && !seen[coin.spentTxid]) {
const outputs = await getOutputs(coin._spentTx);
newBatch = newBatch.concat(outputs);
}
}
Expand Down Expand Up @@ -651,34 +672,34 @@ export class TransactionModel extends BaseTransaction<IBtcTransaction> {
chain,
network,
spentHeight: SpentHeightIndicators.pending,
mintTxid: { $in: spendOps.map(s => s.updateOne.filter.mintTxid) }
_mintTx: { $in: spendOps.map(s => s.updateOne.filter._mintTx) }
})
.project({ mintTxid: 1, mintIndex: 1, spentTxid: 1 })
.project({ _mintTx: 1, mintIndex: 1, spentTxid: 1, _spentTx: 1 })
.toArray();
coins = coins.filter(
c =>
spendOps.findIndex(
s =>
s.updateOne.filter.mintTxid === c.mintTxid &&
s.updateOne.filter._mintTx === c._mintTx &&
s.updateOne.filter.mintIndex === c.mintIndex &&
s.updateOne.update.$set.spentTxid !== c.spentTxid
) > -1
);

const invalidatedTxids = Array.from(new Set(coins.map(c => c.spentTxid)));
const invalidatedTxids = Array.from(new Set(coins.map(c => c._spentTx!)));

for (const txid of invalidatedTxids) {
const allRelatedCoins = await this.findAllRelatedOutputs(txid);
const spentTxids = new Set(allRelatedCoins.filter(c => c.spentTxid).map(c => c.spentTxid));
const spentTxids = new Set(allRelatedCoins.filter(c => c._spentTx).map(c => c._spentTx!));
const txids = [txid].concat(Array.from(spentTxids));
await Promise.all([
this.collection.update(
{ chain, network, txid: { $in: txids } },
{ chain, network, _id: { $in: txids } },
{ $set: { blockHeight: SpentHeightIndicators.conflicting } },
{ multi: true }
),
CoinStorage.collection.update(
{ chain, network, mintTxid: { $in: txids } },
{ chain, network, _mintTx: { $in: txids } },
{ $set: { mintHeight: SpentHeightIndicators.conflicting } },
{ multi: true }
)
Expand Down
Loading