Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# jettonswaps
Bot for detect jetton swaps(only swaps, not liquidity)
Bot for detect jetton swaps (only swaps, not liquidity) at [DeDust.io](https://dedust.io/)

### Installing
```bash
Expand Down
129 changes: 72 additions & 57 deletions src/block_scanner.py
Original file line number Diff line number Diff line change
@@ -1,99 +1,114 @@
import asyncio
import datetime
import hashlib
import logging
import time
import typing
from types import coroutine
from collections import deque

from pytoniq_core import Cell, Slice
from pytoniq_core.tlb.block import ExtBlkRef

from pytoniq.liteclient import LiteClient
from pytoniq_core.tlb import Block, ValueFlow, ShardAccounts
from pytoniq_core.tl import BlockIdExt
from pytoniq.liteclient.balancer import LiteBalancer
from pytoniq_core.tlb import Block
from pytoniq_core.tlb.block import BlkPrevInfo

import typing

class BlockScanner:

def __init__(self,
client: LiteClient,
block_handler: coroutine
):
"""
:param client: LiteClient
:param block_handler: function to be called on new block
"""
class BlocksScanner:
def __init__(self, client: LiteClient, block_handler: typing.Callable[[BlockIdExt], typing.Coroutine]):
self.client = client
self.block_handler = block_handler
self.shards_storage = {}
self.shards_storage: dict[str, int] = {}
self.blks_dequeue = deque()
self.inited = False
self.is_initialized = False

async def run(self):
async def run(self) -> None:
if not self.client.inited:
raise Exception('should init client first')
master_blk = self.mc_info_to_tl_blk(await self.client.get_masterchain_info())
if not self.inited:
shards = await self.client.get_all_shards_info(master_blk)
raise Exception("LiteClient must be initialized")

master_blk: BlockIdExt = self._mc_info_to_tl_blk(
await self.client.get_masterchain_info()
)

if not self.is_initialized:
shards: list[BlockIdExt] = await self.client.get_all_shards_info(master_blk)

for shard in shards:
self.shards_storage[self.get_shard_id(shard)] = shard.seqno
self.shards_storage[self._get_shard_id(shard)] = shard.seqno
self.blks_dequeue.append(shard)
self.inited = True

self.is_initialized = True

while True:
self.blks_dequeue.append(master_blk)

shards = await self.client.get_all_shards_info(master_blk)
shards: list[BlockIdExt] = await self.client.get_all_shards_info(master_blk)

for shard in shards:
await self.get_not_seen_shards(shard)
self.shards_storage[self.get_shard_id(shard)] = shard.seqno
await self.parse_not_seen_shards(shard)

self.shards_storage[self._get_shard_id(shard)] = shard.seqno

while self.blks_dequeue:
await self.block_handler(self.blks_dequeue.pop())
await self.block_handler(self.client, self.blks_dequeue.pop())

last_seqno: int = master_blk.seqno

last_seqno = master_blk.seqno
while True:
new_master_blk = self.mc_info_to_tl_blk(await self.client.get_masterchain_info_ext())
new_master_blk: BlockIdExt = self._mc_info_to_tl_blk(
await self.client.get_masterchain_info_ext()
)

if new_master_blk.seqno != last_seqno:
master_blk = new_master_blk
break

async def get_not_seen_shards(self, shard: BlockIdExt):
if self.shards_storage.get(self.get_shard_id(shard)) == shard.seqno:
return []
result = []
async def parse_not_seen_shards(self, shard: BlockIdExt):
if self.shards_storage.get(self._get_shard_id(shard)) == shard.seqno:
return

self.blks_dequeue.append(shard)
full_blk = await self.client.raw_get_block_header(shard)
prev_ref = full_blk.info.prev_ref
if prev_ref.type_ == 'prev_blk_info': # only one prev block
full_blk: Block = await self.client.raw_get_block_header(shard)
prev_ref: BlkPrevInfo = full_blk.info.prev_ref

if prev_ref.type_ == "prev_blk_info":
prev: ExtBlkRef = prev_ref.prev
await self.get_not_seen_shards(BlockIdExt(
workchain=shard.workchain, seqno=prev.seqno, shard=shard.shard,
root_hash=prev.root_hash, file_hash=prev.file_hash

await self.parse_not_seen_shards(
BlockIdExt(
workchain = shard.workchain,
seqno = prev.seqno,
shard = shard.shard,
root_hash = prev.root_hash,
file_hash = prev.file_hash
)
)

else:
prev1: ExtBlkRef = prev_ref.prev1
prev2: ExtBlkRef = prev_ref.prev2
await self.get_not_seen_shards(BlockIdExt(
workchain=shard.workchain, seqno=prev1.seqno, shard=shard.shard,
root_hash=prev1.root_hash, file_hash=prev1.file_hash

await self.parse_not_seen_shards(
BlockIdExt(
workchain = shard.workchain,
seqno = prev1.seqno,
shard = shard.shard,
root_hash = prev1.root_hash,
file_hash = prev1.file_hash
)
)
await self.get_not_seen_shards(BlockIdExt(
workchain=shard.workchain, seqno=prev2.seqno, shard=shard.shard,
root_hash=prev2.root_hash, file_hash=prev2.file_hash

await self.parse_not_seen_shards(
BlockIdExt(
workchain = shard.workchain,
seqno = prev2.seqno,
shard = shard.shard,
root_hash = prev2.root_hash,
file_hash = prev2.file_hash
)
)
return result

@staticmethod
def mc_info_to_tl_blk(info: dict):
return BlockIdExt.from_dict(info['last'])
def _mc_info_to_tl_blk(info: dict) -> BlockIdExt:
return BlockIdExt.from_dict(info["last"])

@staticmethod
def get_shard_id(blk: BlockIdExt):
return f'{blk.workchain}:{blk.shard}'
def _get_shard_id(blk: BlockIdExt) -> str:
return "{}:{}".format(
blk.workchain,
blk.shard
)
2 changes: 1 addition & 1 deletion src/config.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
TOKEN = 'YOUR_API_KEY_HERE'
TOKEN = 'YOUR_BOT_API_KEY_HERE'
CHAT_ID = 0

# JETTON ADDRESS
Expand Down
76 changes: 59 additions & 17 deletions src/functions.py
Original file line number Diff line number Diff line change
@@ -1,54 +1,80 @@
from pytoniq import begin_cell, Cell, Slice, LiteClient
from logger import logger

import json
import hashlib
from pytoniq import begin_cell, Cell, Slice
import httpx

from config import token
from logger import logger


CUSTOM_LP_DECIMALS: dict[str, int] = {
"jusdt": 12,
"jusdc": 12
}

CUSTOM_METADATA_DECIMALS: dict[str, int] = {
"jusdt": 6,
"jusdc": 6
}


def get_json(filename):
with open(f"{filename}.json", "r") as file:
return json.loads(file.read())


def put_json(content, filename):
with open(f"{filename}.json", "w") as file:
file.write(json.dumps(content))

async def get_lp_price(client, symbol):
r = await client.run_get_method(address=token[symbol], method="estimate_swap_out",
stack=[
begin_cell().store_uint(1, 4).end_cell().begin_parse(),
1000000000
])
if symbol in ["jusdt", "jusdc"]:
return r[1] / 1e12

async def get_lp_price(client: LiteClient, symbol: str):
r = await client.run_get_method(
address = token[symbol],
method = "estimate_swap_out",
stack = [
begin_cell().store_uint(1, 4).end_cell().begin_parse(),
1000000000
]
)

if symbol in CUSTOM_LP_DECIMALS:
return r[1] / 10 ** CUSTOM_LP_DECIMALS[symbol]

return r[1] / 1e9


def get_hash(string: str) -> int:
return int.from_bytes(hashlib.sha256(string.encode()).digest(), 'big')


def get_str_attr_value(meta: dict, attr_name: str):
hash_ = get_hash(attr_name)
value: Slice = meta.get(hash_)

if value is not None:
return value.load_snake_string().replace('\x00', '')


def get_bytes_attr_value(meta: dict, attr_name: str) -> bytes:
hash_ = get_hash(attr_name)
value: Slice = meta.get(hash_)

if value is not None:
return value.load_snake_bytes()


def process_metadata(cell: Cell):
cs = cell.begin_parse()

if not len(cell.refs): # some metadata cells do not have b'\x01' prefix
url = cs.load_snake_string().replace('\x01', '')
return url
return cs.load_snake_string().replace('\x01', '')

else:
cs.load_uint(8)
metadata = cs.load_dict(key_length=256)

if metadata is None:
return {}

Expand All @@ -69,27 +95,43 @@ def process_metadata(cell: Cell):

return result

async def get_jetton_content_by_jetton_wallet(client, jetton_wallet):

async def get_jetton_content_by_jetton_wallet(client: LiteClient, jetton_wallet):
result = {}

stack = await client.run_get_method(address=jetton_wallet, method="get_wallet_data", stack=[])
stack = await client.run_get_method(
address = jetton_wallet,
method = "get_wallet_data",
stack = []
)

result["jetton_address"] = stack[2].load_address().to_str(1, 1, 1)

stack = await client.run_get_method(address=result["jetton_address"], method="get_jetton_data", stack=[])
stack = await client.run_get_method(
address = result["jetton_address"],
method = "get_jetton_data",
stack = []
)

metadata = process_metadata(stack[3])

if isinstance(metadata, str):
async with httpx.AsyncClient() as async_client:
if metadata[0:4] == "ipfs":
r = await async_client.get(f"https://ipfs.io/ipfs/{metadata.replace('ipfs://', '')}")
else:
r = await async_client.get(metadata)
r = r.json()

r: dict = r.json()

result["decimals"] = r.get("decimals", 9)
result["symbol"] = r.get("symbol", "None")

elif not metadata["symbol"]:
async with httpx.AsyncClient() as async_client:
r = await async_client.get(metadata["uri"])
r = r.json()
r: dict = r.json()

result["decimals"] = r.get("decimals", 9)
result["symbol"] = r.get("symbol", "None")

Expand Down
Loading