Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 23 additions & 31 deletions bigquery_to_datadog.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
with open("config.json", "r") as config_file:
config = json.load(config_file)

START_TIMESTAMP = (datetime.now() - timedelta(days=7)).strftime("%Y-%m-%d %H:%M:%S")
PROJECT_ID = config["bigquery_project_id"]

# Configuration
Expand All @@ -25,11 +24,10 @@
JOIN `{project_id}.full_node_stream.order_places` p
ON p.client_id = CAST(s.client_id AS STRING)
AND p.address = s.address
AND p.received_at > TIMESTAMP("{start_timestamp}")
AND p.received_at > TIMESTAMP(@timestamp)
WHERE s.sent_at > TIMESTAMP("{start_timestamp}")
WHERE s.sent_at > TIMESTAMP(DATE_ADD(CURRENT_DATE, INTERVAL -1 DAY))
AND s.address = @maker_address
AND TIMESTAMP_TRUNC(s.sent_at, DAY) > TIMESTAMP(TIMESTAMP_ADD(CURRENT_DATE, INTERVAL -1 DAY))
AND p.received_at > TIMESTAMP(@timestamp)
AND p.received_at > TIMESTAMP(DATE_ADD(CURRENT_DATE, INTERVAL -1 DAY))
UNION ALL
SELECT p.received_at
, TIMESTAMP_DIFF(p.received_at, s.sent_at, millisecond) AS latency
Expand All @@ -38,16 +36,12 @@
JOIN `{project_id}.indexer_stream_new.received_orders_and_cancels` p
ON p.client_id = CAST(s.client_id AS STRING)
AND p.address = s.address
AND p.received_at > TIMESTAMP("{start_timestamp}")
AND p.received_at > TIMESTAMP(@timestamp)
WHERE s.sent_at > TIMESTAMP("{start_timestamp}")
WHERE s.sent_at > TIMESTAMP(DATE_ADD(CURRENT_DATE, INTERVAL -1 DAY))
AND s.address = @maker_address
AND TIMESTAMP_TRUNC(s.sent_at, DAY) > TIMESTAMP(TIMESTAMP_ADD(CURRENT_DATE, INTERVAL -1 DAY))
AND p.received_at > TIMESTAMP(@timestamp)
AND p.received_at > TIMESTAMP(DATE_ADD(CURRENT_DATE, INTERVAL -1 DAY))
ORDER BY 1
""".format(
start_timestamp=START_TIMESTAMP,
project_id=PROJECT_ID,
),
""".format(project_id=PROJECT_ID),
"params": {"maker_address": config["maker_address"]},
"metric_name": "bigquery.short_term_order_latency",
},
Expand All @@ -60,30 +54,25 @@
FROM `{project_id}.latency_experiments.long_running_stateful_orders` s
JOIN `{project_id}.full_node_stream.order_places` p
ON p.client_id = CAST(s.client_id AS STRING)
AND p.address = s.address
AND p.received_at > TIMESTAMP("{start_timestamp}")
AND p.received_at > TIMESTAMP(@timestamp)
WHERE s.sent_at > TIMESTAMP("{start_timestamp}")
AND s.address = @stateful_address
AND TIMESTAMP_TRUNC(s.sent_at, DAY) > TIMESTAMP(TIMESTAMP_ADD(CURRENT_DATE, INTERVAL -1 DAY))
AND p.address = s.address
WHERE s.sent_at > TIMESTAMP(DATE_ADD(CURRENT_DATE, INTERVAL -1 DAY))
AND s.address = @stateful_address
AND p.received_at > TIMESTAMP(@timestamp)
AND p.received_at > TIMESTAMP(DATE_ADD(CURRENT_DATE, INTERVAL -1 DAY))
UNION ALL
SELECT p.received_at
, TIMESTAMP_DIFF(p.received_at, s.sent_at, millisecond) AS latency
, p.server_address
FROM `{project_id}.latency_experiments.long_running_stateful_orders` s
JOIN `{project_id}.indexer_stream_new.received_orders_and_cancels` p
ON p.client_id = CAST(s.client_id AS STRING)
AND p.address = s.address
AND p.received_at > TIMESTAMP("{start_timestamp}")
AND p.received_at > TIMESTAMP(@timestamp)
WHERE s.sent_at > TIMESTAMP("{start_timestamp}")
AND s.address = @stateful_address
AND TIMESTAMP_TRUNC(s.sent_at, DAY) > TIMESTAMP(TIMESTAMP_ADD(CURRENT_DATE, INTERVAL -1 DAY))
AND p.address = s.address
WHERE s.sent_at > TIMESTAMP(DATE_ADD(CURRENT_DATE, INTERVAL -1 DAY))
AND s.address = @stateful_address
AND p.received_at > TIMESTAMP(@timestamp)
AND p.received_at > TIMESTAMP(DATE_ADD(CURRENT_DATE, INTERVAL -1 DAY))
ORDER BY 1
""".format(
start_timestamp=START_TIMESTAMP,
project_id=PROJECT_ID,
),
""".format(project_id=PROJECT_ID),
"params": {"stateful_address": config["stateful_address"]},
"metric_name": "bigquery.stateful_order_latency",
},
Expand All @@ -106,7 +95,9 @@ def load_last_processed_timestamps():
if os.path.exists(STATE_FILE):
with open(STATE_FILE, "r") as f:
return json.load(f)
return {query["name"]: START_TIMESTAMP for query in QUERIES}
# Default to 1 day ago on first run, aligned with the query window
default = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d %H:%M:%S")
return {query["name"]: default for query in QUERIES}


def save_last_processed_timestamps(timestamps):
Expand Down Expand Up @@ -144,7 +135,8 @@ def monitor():
params = query_info["params"]
metric_name = query_info["metric_name"]
last_processed_timestamp = last_processed_timestamps.get(
query_name, START_TIMESTAMP
query_name,
(datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d %H:%M:%S")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in general, .isoformat()

)
results = run_query(client, query, params, last_processed_timestamp)

Expand Down