From 85d192e12eb81dea449e2db53000fab510c0021c Mon Sep 17 00:00:00 2001 From: Andrey Date: Mon, 12 Sep 2022 12:37:38 +0300 Subject: [PATCH 1/6] Add metadata uri to metadata label_data. --- crawlers/mooncrawl/mooncrawl/metadata_crawler/db.py | 1 + 1 file changed, 1 insertion(+) diff --git a/crawlers/mooncrawl/mooncrawl/metadata_crawler/db.py b/crawlers/mooncrawl/mooncrawl/metadata_crawler/db.py index 14ae5a036..b0b5ba988 100644 --- a/crawlers/mooncrawl/mooncrawl/metadata_crawler/db.py +++ b/crawlers/mooncrawl/mooncrawl/metadata_crawler/db.py @@ -29,6 +29,7 @@ def metadata_to_label( { "type": "metadata", "token_id": token_uri_data.token_id, + "token_uri": token_uri_data.token_uri, "metadata": metadata, } ).replace(r"\u0000", "") From 9bb5fb4794d5b8feb49ec19e5679d3aeb9357d0b Mon Sep 17 00:00:00 2001 From: Andrey Date: Mon, 12 Sep 2022 16:02:32 +0300 Subject: [PATCH 2/6] Add thread executor. --- .../mooncrawl/metadata_crawler/cli.py | 27 ++++++++++--------- crawlers/mooncrawl/mooncrawl/settings.py | 18 +++++++++++++ 2 files changed, 33 insertions(+), 12 deletions(-) diff --git a/crawlers/mooncrawl/mooncrawl/metadata_crawler/cli.py b/crawlers/mooncrawl/mooncrawl/metadata_crawler/cli.py index 6140fd788..5a83b6941 100644 --- a/crawlers/mooncrawl/mooncrawl/metadata_crawler/cli.py +++ b/crawlers/mooncrawl/mooncrawl/metadata_crawler/cli.py @@ -4,6 +4,7 @@ import urllib.request import logging from typing import Dict, Any +from concurrent.futures import ThreadPoolExecutor from moonstreamdb.blockchain import AvailableBlockchainType from moonstreamdb.db import ( @@ -19,8 +20,10 @@ metadata_to_label, ) from ..settings import ( + MOONSTREAM_METADATA_CRAWLER_THREADS, MOONSTREAM_STATE_CRAWLER_DB_STATEMENT_TIMEOUT_MILLIS, ) +from ..data import TokenURIs logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -29,7 +32,7 @@ batch_size = 50 -def crawl_uri(metadata_uri: str) -> Any: +def crawl_uri(token_uri_data: TokenURIs) -> Any: """ Get metadata from URI @@ -39,7 +42,7 @@ def crawl_uri(metadata_uri: str) -> Any: while retry < 3: try: - response = urllib.request.urlopen(metadata_uri, timeout=5) + response = urllib.request.urlopen(token_uri_data.token_uri, timeout=5) if response.status == 200: result = json.loads(response.read()) @@ -54,7 +57,7 @@ def crawl_uri(metadata_uri: str) -> Any: logger.error(err) retry += 1 continue - return result + return result, token_uri_data def parse_metadata(blockchain_type: AvailableBlockchainType, batch_size: int): @@ -89,28 +92,28 @@ def parse_metadata(blockchain_type: AvailableBlockchainType, batch_size: int): for address in tokens_uri_by_address: - already_parsed = get_current_metadata_for_address( - db_session=db_session, blockchain_type=blockchain_type, address=address - ) - for requests_chunk in [ tokens_uri_by_address[address][i : i + batch_size] for i in range(0, len(tokens_uri_by_address[address]), batch_size) ]: writed_labels = 0 - for token_uri_data in requests_chunk: - if token_uri_data.token_id not in already_parsed: - metadata = crawl_uri(token_uri_data.token_uri) + with ThreadPoolExecutor( + max_workers=MOONSTREAM_METADATA_CRAWLER_THREADS + ) as executor: + for result in executor.map( + crawl_uri, [request for request in requests_chunk] + ): db_session.add( metadata_to_label( + metadata=result[0], blockchain_type=blockchain_type, - metadata=metadata, - token_uri_data=token_uri_data, + token_uri_data=result[1], ) ) writed_labels += 1 + commit_session(db_session) logger.info(f"Write {writed_labels} labels for {address}") diff --git a/crawlers/mooncrawl/mooncrawl/settings.py b/crawlers/mooncrawl/mooncrawl/settings.py index 27d6faf40..7cddcdd5b 100644 --- a/crawlers/mooncrawl/mooncrawl/settings.py +++ b/crawlers/mooncrawl/mooncrawl/settings.py @@ -176,3 +176,21 @@ AvailableBlockchainType.MUMBAI: "0xe9939e7Ea7D7fb619Ac57f648Da7B1D425832631", AvailableBlockchainType.ETHEREUM: "0x5BA1e12693Dc8F9c48aAD8770482f4739bEeD696", } + + +# Metadata crawler + +MOONSTREAM_METADATA_CRAWLER_THREADS = 10 + +MOONSTREAM_METADATA_CRAWLER_THREADS_RAW = os.environ.get( + "MOONSTREAM_METADATA_CRAWLER_THREADS" +) +try: + if MOONSTREAM_METADATA_CRAWLER_THREADS_RAW is not None: + MOONSTREAM_METADATA_CRAWLER_THREADS = int( + MOONSTREAM_METADATA_CRAWLER_THREADS_RAW + ) +except: + raise Exception( + f"Could not parse MOONSTREAM_METADATA_CRAWLER_THREADS as int: {MOONSTREAM_METADATA_CRAWLER_THREADS_RAW}" + ) From c11ddd51ce412bbfbff010cc440f3bb99daea5b4 Mon Sep 17 00:00:00 2001 From: Andrey Date: Wed, 14 Sep 2022 19:49:07 +0300 Subject: [PATCH 3/6] Add update logic. --- crawlers/mooncrawl/mooncrawl/data.py | 2 +- .../mooncrawl/metadata_crawler/cli.py | 44 ++++-- .../mooncrawl/metadata_crawler/db.py | 143 +++++++++++------- 3 files changed, 119 insertions(+), 70 deletions(-) diff --git a/crawlers/mooncrawl/mooncrawl/data.py b/crawlers/mooncrawl/mooncrawl/data.py index b46cc01ef..0e3da95d3 100644 --- a/crawlers/mooncrawl/mooncrawl/data.py +++ b/crawlers/mooncrawl/mooncrawl/data.py @@ -54,6 +54,6 @@ class QueryDataUpdate(BaseModel): class TokenURIs(BaseModel): token_id: str token_uri: str - block_number: str + block_number: int block_timestamp: str address: str diff --git a/crawlers/mooncrawl/mooncrawl/metadata_crawler/cli.py b/crawlers/mooncrawl/mooncrawl/metadata_crawler/cli.py index 5a83b6941..75f3767db 100644 --- a/crawlers/mooncrawl/mooncrawl/metadata_crawler/cli.py +++ b/crawlers/mooncrawl/mooncrawl/metadata_crawler/cli.py @@ -1,9 +1,11 @@ import argparse import json +from time import time, sleep from urllib.error import HTTPError import urllib.request import logging -from typing import Dict, Any +import requests +from typing import Any from concurrent.futures import ThreadPoolExecutor from moonstreamdb.blockchain import AvailableBlockchainType @@ -15,8 +17,8 @@ from sqlalchemy.orm import sessionmaker from .db import ( commit_session, - get_uris_of_tokens, - get_current_metadata_for_address, + get_uri_addresses, + get_not_updated_metadata_for_address, metadata_to_label, ) from ..settings import ( @@ -41,8 +43,14 @@ def crawl_uri(token_uri_data: TokenURIs) -> Any: result = None while retry < 3: try: - - response = urllib.request.urlopen(token_uri_data.token_uri, timeout=5) + req = urllib.request.Request( + token_uri_data.token_uri, + None, + { + "User-agent": "Mozilla/5.0 (Windows; U; Windows NT 5.1; de; rv:1.9.1.5) Gecko/20091102 Firefox/3.5.5" + }, + ) + response = urllib.request.urlopen(req, timeout=5) if response.status == 200: result = json.loads(response.read()) @@ -51,12 +59,17 @@ def crawl_uri(token_uri_data: TokenURIs) -> Any: except HTTPError as error: logger.error(f"request end with error statuscode: {error.code}") + logger.error(f"requested uri: {token_uri_data.token_uri}") retry += 1 + sleep(2) continue except Exception as err: logger.error(err) + logger.error(f"requested uri: {token_uri_data.token_uri}") retry += 1 + sleep(2) continue + sleep(0.5) return result, token_uri_data @@ -81,20 +94,23 @@ def parse_metadata(blockchain_type: AvailableBlockchainType, batch_size: int): # run crawling of levels try: - uris_of_tokens = get_uris_of_tokens(db_session, blockchain_type) + meradata_addresses = get_uri_addresses(db_session, blockchain_type) - tokens_uri_by_address: Dict[str, Any] = {} + for address in meradata_addresses: - for token_uri_data in uris_of_tokens: - if token_uri_data.address not in tokens_uri_by_address: - tokens_uri_by_address[token_uri_data.address] = [] - tokens_uri_by_address[token_uri_data.address].append(token_uri_data) + not_updated_tokens = get_not_updated_metadata_for_address( + db_session, + blockchain_type, + address=address, + ) - for address in tokens_uri_by_address: + logger.info( + f"Start crawling {len(not_updated_tokens)} tokens of address {address}" + ) for requests_chunk in [ - tokens_uri_by_address[address][i : i + batch_size] - for i in range(0, len(tokens_uri_by_address[address]), batch_size) + not_updated_tokens[i : i + batch_size] + for i in range(0, len(not_updated_tokens), batch_size) ]: writed_labels = 0 diff --git a/crawlers/mooncrawl/mooncrawl/metadata_crawler/db.py b/crawlers/mooncrawl/mooncrawl/metadata_crawler/db.py index b0b5ba988..d21ed374c 100644 --- a/crawlers/mooncrawl/mooncrawl/metadata_crawler/db.py +++ b/crawlers/mooncrawl/mooncrawl/metadata_crawler/db.py @@ -1,6 +1,7 @@ import logging import json from typing import Dict, Any, Optional, List +from unittest import result from moonstreamdb.blockchain import AvailableBlockchainType, get_label_model from sqlalchemy.orm import Session @@ -60,9 +61,9 @@ def commit_session(db_session: Session) -> None: raise e -def get_uris_of_tokens( +def get_uri_addresses( db_session: Session, blockchain_type: AvailableBlockchainType -) -> List[TokenURIs]: +) -> List[str]: """ Get meatadata URIs. @@ -70,47 +71,22 @@ def get_uris_of_tokens( label_model = get_label_model(blockchain_type) - table = label_model.__tablename__ - - metadata_for_parsing = db_session.execute( - """ SELECT - DISTINCT ON(label_data -> 'inputs'-> 0 ) label_data -> 'inputs'-> 0 as token_id, - label_data -> 'result' as token_uri, - block_number as block_number, - block_timestamp as block_timestamp, - address as address - - FROM - {} - WHERE - label = :label - AND label_data ->> 'name' = :name - ORDER BY - label_data -> 'inputs'-> 0 ASC, - block_number :: INT DESC; - """.format( - table - ), - {"table": table, "label": VIEW_STATE_CRAWLER_LABEL, "name": "tokenURI"}, - ) + addresses = ( + db_session.query(label_model.address.distinct()) + .filter(label_model.label == VIEW_STATE_CRAWLER_LABEL) + .filter(label_model.label_data["name"].astext == "tokenURI") + ).all() - results = [ - TokenURIs( - token_id=data[0], - token_uri=data[1], - block_number=data[2], - block_timestamp=data[3], - address=data[4], - ) - for data in metadata_for_parsing - ] + result = [address[0] for address in addresses] - return results + return result -def get_current_metadata_for_address( - db_session: Session, blockchain_type: AvailableBlockchainType, address: str -): +def get_not_updated_metadata_for_address( + db_session: Session, + blockchain_type: AvailableBlockchainType, + address: str, +) -> List[TokenURIs]: """ Get existing metadata. """ @@ -120,22 +96,79 @@ def get_current_metadata_for_address( table = label_model.__tablename__ current_metadata = db_session.execute( - """ SELECT - DISTINCT ON(label_data ->> 'token_id') label_data ->> 'token_id' as token_id - FROM - {} - WHERE - address = :address - AND label = :label - ORDER BY - label_data ->> 'token_id' ASC, - block_number :: INT DESC; - """.format( - table + """ with current_tokens_uri as ( + SELECT + DISTINCT ON((label_data -> 'inputs' -> 0) :: int) (label_data -> 'inputs' -> 0) :: text as token_id, + label_data ->> 'result' as token_uri, + block_number, + address, + block_timestamp + from + {} + where + label = :view_state_label + AND address = :address + and label_data ->> 'name' = 'tokenURI' + order by + (label_data -> 'inputs' -> 0) :: INT ASC, + block_number :: INT DESC + ), + tokens_metadata as ( + SELECT + DISTINCT ON((label_data ->> 'token_id') :: int) (label_data ->> 'token_id') :: text as token_id, + label_data ->>'token_uri' as token_uri, + block_number + from + {} + where + label = :metadata_label + AND address = :address + order by + (label_data ->> 'token_id') :: INT ASC, + block_number :: INT DESC + ), + tokens_state as ( + SELECT + current_tokens_uri.token_id, + current_tokens_uri.token_uri as state_token_uri, + current_tokens_uri.block_number as view_state_block_number, + current_tokens_uri.block_timestamp as block_timestamp, + current_tokens_uri.address as address, + tokens_metadata.block_number as metadata_block_number, + tokens_metadata.token_uri as metadata_token_uri + from + current_tokens_uri + left JOIN tokens_metadata ON current_tokens_uri.token_id = tokens_metadata.token_id + ) + SELECT + token_id, + state_token_uri, + view_state_block_number, + block_timestamp, + address + from + tokens_state + where + view_state_block_number > metadata_block_number OR metadata_token_uri is null OR metadata_token_uri != state_token_uri; + """.format( + table, table ), - {"address": address, "label": METADATA_CRAWLER_LABEL}, - ) + { + "metadata_label": METADATA_CRAWLER_LABEL, + "view_state_label": VIEW_STATE_CRAWLER_LABEL, + "address": address, + }, + ).all() - result = [data[0] for data in current_metadata] + results = [ + TokenURIs( + token_id=data[0], + token_uri=data[1], + block_number=data[2], + block_timestamp=data[3], + address=data[4], + ) + for data in current_metadata + ] - return result + return results From 8afabf9f0fb801307a30e940853477528edeae8e Mon Sep 17 00:00:00 2001 From: Andrey Date: Wed, 14 Sep 2022 20:46:28 +0300 Subject: [PATCH 4/6] Add update logic. --- crawlers/mooncrawl/mooncrawl/data.py | 4 ++- .../mooncrawl/metadata_crawler/cli.py | 27 ++++++++++++---- .../mooncrawl/metadata_crawler/db.py | 32 +++++++++++++++++-- 3 files changed, 53 insertions(+), 10 deletions(-) diff --git a/crawlers/mooncrawl/mooncrawl/data.py b/crawlers/mooncrawl/mooncrawl/data.py index 0e3da95d3..9757033a7 100644 --- a/crawlers/mooncrawl/mooncrawl/data.py +++ b/crawlers/mooncrawl/mooncrawl/data.py @@ -1,7 +1,8 @@ from dataclasses import dataclass from datetime import datetime from enum import Enum -from typing import Any, Dict, List +from typing import Any, Dict, List, Optional +from uuid import UUID from pydantic import BaseModel, Field @@ -57,3 +58,4 @@ class TokenURIs(BaseModel): block_number: int block_timestamp: str address: str + metadata_id: Optional[UUID] = None diff --git a/crawlers/mooncrawl/mooncrawl/metadata_crawler/cli.py b/crawlers/mooncrawl/mooncrawl/metadata_crawler/cli.py index 75f3767db..b51a130b3 100644 --- a/crawlers/mooncrawl/mooncrawl/metadata_crawler/cli.py +++ b/crawlers/mooncrawl/mooncrawl/metadata_crawler/cli.py @@ -20,6 +20,7 @@ get_uri_addresses, get_not_updated_metadata_for_address, metadata_to_label, + update_metadata, ) from ..settings import ( MOONSTREAM_METADATA_CRAWLER_THREADS, @@ -121,12 +122,26 @@ def parse_metadata(blockchain_type: AvailableBlockchainType, batch_size: int): for result in executor.map( crawl_uri, [request for request in requests_chunk] ): - db_session.add( - metadata_to_label( - metadata=result[0], - blockchain_type=blockchain_type, - token_uri_data=result[1], - ) + + metadata = result[0] + token_uri_data = result[1] + label = metadata_to_label( + metadata=metadata, + blockchain_type=blockchain_type, + token_uri_data=token_uri_data, + ) + + if token_uri_data.metadata_id is None: + + db_session.add(label) + writed_labels += 1 + continue + + update_metadata( + db_session, + blockchain_type, + token_uri_data.metadata_id, + label, ) writed_labels += 1 diff --git a/crawlers/mooncrawl/mooncrawl/metadata_crawler/db.py b/crawlers/mooncrawl/mooncrawl/metadata_crawler/db.py index d21ed374c..9aaf18d94 100644 --- a/crawlers/mooncrawl/mooncrawl/metadata_crawler/db.py +++ b/crawlers/mooncrawl/mooncrawl/metadata_crawler/db.py @@ -117,7 +117,8 @@ def get_not_updated_metadata_for_address( SELECT DISTINCT ON((label_data ->> 'token_id') :: int) (label_data ->> 'token_id') :: text as token_id, label_data ->>'token_uri' as token_uri, - block_number + block_number, + id from {} where @@ -135,7 +136,8 @@ def get_not_updated_metadata_for_address( current_tokens_uri.block_timestamp as block_timestamp, current_tokens_uri.address as address, tokens_metadata.block_number as metadata_block_number, - tokens_metadata.token_uri as metadata_token_uri + tokens_metadata.token_uri as metadata_token_uri, + tokens_metadata.id as metadata_id from current_tokens_uri left JOIN tokens_metadata ON current_tokens_uri.token_id = tokens_metadata.token_id @@ -145,7 +147,8 @@ def get_not_updated_metadata_for_address( state_token_uri, view_state_block_number, block_timestamp, - address + address, + metadata_id from tokens_state where @@ -167,8 +170,31 @@ def get_not_updated_metadata_for_address( block_number=data[2], block_timestamp=data[3], address=data[4], + metadata_id=data[5], ) for data in current_metadata ] return results + + +def update_metadata( + db_session: Session, + blockchain_type: AvailableBlockchainType, + id: Dict[str, Any], + label: Any, +) -> None: + """ + Update metadata. + """ + + label_model = get_label_model(blockchain_type) + + db_session.query(label_model).filter(label_model.id == id).update( + { + "label_data": label.label_data, + "block_number": label.block_number, + "block_timestamp": label.block_timestamp, + }, + synchronize_session=False, + ) From cfbbe3f81780d35ea8b94ae925be0973bd25c76d Mon Sep 17 00:00:00 2001 From: Andrey Date: Tue, 20 Sep 2022 15:51:15 +0300 Subject: [PATCH 5/6] Add description. --- crawlers/mooncrawl/mooncrawl/metadata_crawler/db.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/crawlers/mooncrawl/mooncrawl/metadata_crawler/db.py b/crawlers/mooncrawl/mooncrawl/metadata_crawler/db.py index 9aaf18d94..040beb81b 100644 --- a/crawlers/mooncrawl/mooncrawl/metadata_crawler/db.py +++ b/crawlers/mooncrawl/mooncrawl/metadata_crawler/db.py @@ -89,6 +89,16 @@ def get_not_updated_metadata_for_address( ) -> List[TokenURIs]: """ Get existing metadata. + + We want update metadata in 3 posible condition: + 1) State of tokenURI have higher block_number then block_number of alredy crawled metadata. + 2) metadata_token_uri is none for this token_id(we not crawl that token metadata yet). + 3) metadata_token_uri is different then token_uri from state. + + Query use both labels view_state_crawler and metadata_crawler + + TODO(Andrey): Replace exexute to query builder syntax. + """ label_model = get_label_model(blockchain_type) From 633b6ed9a2dfd09641bf350c655966499c099e0b Mon Sep 17 00:00:00 2001 From: Andrey Date: Tue, 4 Oct 2022 16:21:31 +0300 Subject: [PATCH 6/6] Add status field in label_data. --- crawlers/mooncrawl/mooncrawl/metadata_crawler/db.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/crawlers/mooncrawl/mooncrawl/metadata_crawler/db.py b/crawlers/mooncrawl/mooncrawl/metadata_crawler/db.py index 040beb81b..061edefd1 100644 --- a/crawlers/mooncrawl/mooncrawl/metadata_crawler/db.py +++ b/crawlers/mooncrawl/mooncrawl/metadata_crawler/db.py @@ -25,12 +25,18 @@ def metadata_to_label( """ label_model = get_label_model(blockchain_type) + status = 1 + + if metadata is None: + status = 0 + sanityzed_label_data = json.loads( json.dumps( { "type": "metadata", "token_id": token_uri_data.token_id, "token_uri": token_uri_data.token_uri, + "status": status, "metadata": metadata, } ).replace(r"\u0000", "")