diff --git a/bigquery_to_datadog.py b/bigquery_to_datadog.py index 5d877c9..ff2d2f8 100644 --- a/bigquery_to_datadog.py +++ b/bigquery_to_datadog.py @@ -10,7 +10,6 @@ with open("config.json", "r") as config_file: config = json.load(config_file) -START_TIMESTAMP = (datetime.now() - timedelta(days=7)).strftime("%Y-%m-%d %H:%M:%S") PROJECT_ID = config["bigquery_project_id"] # Configuration @@ -25,11 +24,10 @@ JOIN `{project_id}.full_node_stream.order_places` p ON p.client_id = CAST(s.client_id AS STRING) AND p.address = s.address - AND p.received_at > TIMESTAMP("{start_timestamp}") - AND p.received_at > TIMESTAMP(@timestamp) - WHERE s.sent_at > TIMESTAMP("{start_timestamp}") + WHERE s.sent_at > TIMESTAMP(DATE_ADD(CURRENT_DATE, INTERVAL -1 DAY)) AND s.address = @maker_address - AND TIMESTAMP_TRUNC(s.sent_at, DAY) > TIMESTAMP(TIMESTAMP_ADD(CURRENT_DATE, INTERVAL -1 DAY)) + AND p.received_at > TIMESTAMP(@timestamp) + AND p.received_at > TIMESTAMP(DATE_ADD(CURRENT_DATE, INTERVAL -1 DAY)) UNION ALL SELECT p.received_at , TIMESTAMP_DIFF(p.received_at, s.sent_at, millisecond) AS latency @@ -38,16 +36,12 @@ JOIN `{project_id}.indexer_stream_new.received_orders_and_cancels` p ON p.client_id = CAST(s.client_id AS STRING) AND p.address = s.address - AND p.received_at > TIMESTAMP("{start_timestamp}") - AND p.received_at > TIMESTAMP(@timestamp) - WHERE s.sent_at > TIMESTAMP("{start_timestamp}") + WHERE s.sent_at > TIMESTAMP(DATE_ADD(CURRENT_DATE, INTERVAL -1 DAY)) AND s.address = @maker_address - AND TIMESTAMP_TRUNC(s.sent_at, DAY) > TIMESTAMP(TIMESTAMP_ADD(CURRENT_DATE, INTERVAL -1 DAY)) + AND p.received_at > TIMESTAMP(@timestamp) + AND p.received_at > TIMESTAMP(DATE_ADD(CURRENT_DATE, INTERVAL -1 DAY)) ORDER BY 1 - """.format( - start_timestamp=START_TIMESTAMP, - project_id=PROJECT_ID, - ), + """.format(project_id=PROJECT_ID), "params": {"maker_address": config["maker_address"]}, "metric_name": "bigquery.short_term_order_latency", }, @@ -60,12 +54,11 @@ FROM `{project_id}.latency_experiments.long_running_stateful_orders` s JOIN `{project_id}.full_node_stream.order_places` p ON p.client_id = CAST(s.client_id AS STRING) - AND p.address = s.address - AND p.received_at > TIMESTAMP("{start_timestamp}") - AND p.received_at > TIMESTAMP(@timestamp) - WHERE s.sent_at > TIMESTAMP("{start_timestamp}") - AND s.address = @stateful_address - AND TIMESTAMP_TRUNC(s.sent_at, DAY) > TIMESTAMP(TIMESTAMP_ADD(CURRENT_DATE, INTERVAL -1 DAY)) + AND p.address = s.address + WHERE s.sent_at > TIMESTAMP(DATE_ADD(CURRENT_DATE, INTERVAL -1 DAY)) + AND s.address = @stateful_address + AND p.received_at > TIMESTAMP(@timestamp) + AND p.received_at > TIMESTAMP(DATE_ADD(CURRENT_DATE, INTERVAL -1 DAY)) UNION ALL SELECT p.received_at , TIMESTAMP_DIFF(p.received_at, s.sent_at, millisecond) AS latency @@ -73,17 +66,13 @@ FROM `{project_id}.latency_experiments.long_running_stateful_orders` s JOIN `{project_id}.indexer_stream_new.received_orders_and_cancels` p ON p.client_id = CAST(s.client_id AS STRING) - AND p.address = s.address - AND p.received_at > TIMESTAMP("{start_timestamp}") - AND p.received_at > TIMESTAMP(@timestamp) - WHERE s.sent_at > TIMESTAMP("{start_timestamp}") - AND s.address = @stateful_address - AND TIMESTAMP_TRUNC(s.sent_at, DAY) > TIMESTAMP(TIMESTAMP_ADD(CURRENT_DATE, INTERVAL -1 DAY)) + AND p.address = s.address + WHERE s.sent_at > TIMESTAMP(DATE_ADD(CURRENT_DATE, INTERVAL -1 DAY)) + AND s.address = @stateful_address + AND p.received_at > TIMESTAMP(@timestamp) + AND p.received_at > TIMESTAMP(DATE_ADD(CURRENT_DATE, INTERVAL -1 DAY)) ORDER BY 1 - """.format( - start_timestamp=START_TIMESTAMP, - project_id=PROJECT_ID, - ), + """.format(project_id=PROJECT_ID), "params": {"stateful_address": config["stateful_address"]}, "metric_name": "bigquery.stateful_order_latency", }, @@ -106,7 +95,9 @@ def load_last_processed_timestamps(): if os.path.exists(STATE_FILE): with open(STATE_FILE, "r") as f: return json.load(f) - return {query["name"]: START_TIMESTAMP for query in QUERIES} + # Default to 1 day ago on first run, aligned with the query window + default = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d %H:%M:%S") + return {query["name"]: default for query in QUERIES} def save_last_processed_timestamps(timestamps): @@ -144,7 +135,8 @@ def monitor(): params = query_info["params"] metric_name = query_info["metric_name"] last_processed_timestamp = last_processed_timestamps.get( - query_name, START_TIMESTAMP + query_name, + (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d %H:%M:%S") ) results = run_query(client, query, params, last_processed_timestamp)