Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions backend/apps/api/v1/views_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def get(self, request, *args, **kwargs):
"Descrição do Conjunto de Dados",
"Nome Tabela",
"Descrição Tabela",
"Dados Mais Recentes Fechados",
"Disponibilidade dos dados",
"Número de linhas da tabela",
"Cobertura temporal (anos e meses que temos dados disponíveis)",
],
Expand All @@ -33,7 +33,7 @@ def get(self, request, *args, **kwargs):
"Dataset Description",
"Table Name",
"Table Description",
"Most Recent Closed Data",
"Data Availability",
"Number of rows in the table",
"Temporal coverage (years and months for which data is available)",
],
Expand All @@ -42,7 +42,7 @@ def get(self, request, *args, **kwargs):
"Descripción del Conjunto de Datos",
"Nombre de la Tabla",
"Descripción de la Tabla",
"Datos Más Recientes Cerrados",
"Disponibilidad de los datos",
"Número de filas de la tabla",
"Cobertura temporal (años y meses para los cuales hay datos disponibles)",
],
Expand All @@ -58,18 +58,18 @@ def get(self, request, *args, **kwargs):

status_map = {
"pt": {
"closed": "Dados Parcialmente Fechados",
"open": "Dados 100% Abertos",
"closed": "Parcialmente ou totalmente pago",
"open": "Totalmente grátis",
"to": " a ",
},
"en": {
"closed": "Partially Closed Data",
"open": "100% Open Data",
"closed": "Partially or totally paid",
"open": "Totally free",
"to": " to ",
},
"es": {
"closed": "Datos Parcialmente Cerrados",
"open": "Datos 100% Abiertos",
"closed": "Parcial o totalmente de pago",
"open": "Totalmente gratis",
"to": " a ",
},
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# -*- coding: utf-8 -*-
from enum import Enum


class Querys(Enum):
FLOWS_FAILED_LAST_WEEK = """
query($since: timestamptz!) {
flow(
where: {
schedule: { _is_null: false }
is_schedule_active: {_eq: true}
archived: {_eq: false}
flow_runs: {
state: { _eq: "Failed" }
start_time: { _gte: $since }
}
}
) {
id
created
name
}
}
"""

LAST_COMPLETED_RUNS_TASKS = """
query LastTwoCompletedRunsWithTasks($flow_id: uuid!) {
flow_run(
where: {
flow_id: { _eq: $flow_id }
state: { _in: ["Success", "Failed"] }
}
order_by: { start_time: desc }
limit: 2
) {
id
name
start_time
state
task_runs(
where: {
state: { _in: ["Success", "Failed"] }
}
order_by: { start_time: desc }
limit: 1) {
id
state
end_time
state_message
task {
id
name
}
}
}
}
"""


class Constants(Enum):
TASKS_NAME_DISABLE = ("run_dbt",)
FLOW_SUCCESS_STATE = "Success"
FLOW_FAILED_STATE = "Failed"

PREFECT_URL = "https://prefect.basedosdados.org/"
PREFECT_URL_FLOW = PREFECT_URL + "flow/"
PREFECT_URL_API = PREFECT_URL + "api"

DISCORD_ROLE_DADOS = "865034571469160458"
TEXT_FLOW_FORMAT = "- {run_name} | Last failure `{task_name}` | {link}"

STATE_MESSAGE_IGNORE = (
"No heartbeat detected from the remote task; marking the run as failed.",
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# -*- coding: utf-8 -*-
from datetime import datetime, timedelta


def parse_datetime(value: str) -> datetime:
return datetime.fromisoformat(value)


def one_week_ago() -> str:
from django.utils import timezone

return (timezone.now() - timedelta(days=7)).strftime("%Y-%m-%dT%H:%M:%SZ")
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# -*- coding: utf-8 -*-
from typing import TYPE_CHECKING, Optional

from .constants import Constants
from .datetime_utils import parse_datetime

if TYPE_CHECKING:
from .service import FlowService


class Task:
def __init__(self, id: str, name: str):
self.id = id
self.name = name


class TaskRun:
def __init__(
self,
id: str,
state: str,
end_time: Optional[str],
state_message: str,
task: dict,
):
self.id = id
self.state = state
self.end_time = end_time
self.state_message = state_message
self.task = Task(**task)


class FlowRun:
def __init__(
self,
id: str,
name: str,
start_time: str,
state: str,
task_runs: list,
):
self.id = id
self.name = name
self.start_time = parse_datetime(start_time)
self.state = state
self.task_runs = TaskRun(**task_runs[0])


class FlowDisable:
def __init__(self, id: str, created: str, service: "FlowService"):
self.id = id
self.created = parse_datetime(created)
self.service = service
self.runs = self.get_runs()

def get_runs(self):
response = self.service.last_completed_runs_tasks(self.id)
return [FlowRun(**run) for run in response["flow_run"]]

def validate(self) -> bool:
last_run, next_last = self.runs
failed = Constants.FLOW_FAILED_STATE.value

dbt_failed_after_created = (
last_run.task_runs.task.name in Constants.TASKS_NAME_DISABLE.value
and last_run.start_time >= self.created
and last_run.state == failed
and last_run.task_runs.state_message not in Constants.STATE_MESSAGE_IGNORE.value
)

any_failed_after_created = (
last_run.state == failed
and next_last.state == failed
and max(last_run.start_time, next_last.start_time) >= self.created
)

return dbt_failed_after_created or any_failed_after_created
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
# -*- coding: utf-8 -*-
import os
from typing import Dict

from gql import Client, gql
from gql.transport.requests import RequestsHTTPTransport
from loguru import logger

from backend.custom.client import send_discord_message

from .constants import Constants, Querys
from .datetime_utils import one_week_ago
from .models import FlowDisable

logger = logger.bind(module="core")


class MakeClient:
def __init__(self):
self.graphql_url = Constants.PREFECT_URL_API.value
self.query = self.make_client({"Authorization": f"Bearer {os.getenv('API_KEY_PREFECT')}"})

def make_client(self, headers: Dict[str, str] = None) -> Client:
transport = RequestsHTTPTransport(url=self.graphql_url, headers=headers, use_json=True)

return Client(transport=transport, fetch_schema_from_transport=False)


class FlowService:
def __init__(self):
self.client = MakeClient()

def flows_failed_last_week(self) -> list:
since = one_week_ago()

variables = {"since": since}

response = self.client.query.execute(
gql(Querys.FLOWS_FAILED_LAST_WEEK.value), variable_values=variables
)

return [{"id": fail["id"], "created": fail["created"]} for fail in response["flow"]]

def last_completed_runs_tasks(self, flow_id: str):
variables = {"flow_id": flow_id}

return self.client.query.execute(
gql(Querys.LAST_COMPLETED_RUNS_TASKS.value), variable_values=variables
)

def set_flow_schedule(self, flow_id: str, active: bool):
mutation_name = "set_schedule_active" if active else "set_schedule_inactive"

query = f"""
mutation SetFlowSchedule($flow_id: UUID!) {{
{mutation_name}(
input: {{
flow_id: $flow_id
}}
) {{
success
}}
}}
"""

variables = {"flow_id": flow_id}

return self.client.query.execute(gql(query), variable_values=variables)

def disable_unhealthy_flow_schedules(self, dry_run: bool = False) -> None:
flows_data = self.flows_failed_last_week()

flows = [FlowDisable(**flow, service=self) for flow in flows_data]

flows_to_disable = [flow for flow in flows if flow.validate()]

logger.info("Flows para ficar em alerta:")

for flow in flows:
logger.info(
Constants.TEXT_FLOW_FORMAT.value.format(
task_name=flow.runs[0].task_runs.task.name,
run_name=flow.runs[0].name,
link=Constants.PREFECT_URL_FLOW.value + flow.id,
)
)

if flows_to_disable and not dry_run:
logger.info("Flows desativados:")

for flow in flows_to_disable:
self.set_flow_schedule(flow_id=flow.id, active=False)

logger.info(
Constants.TEXT_FLOW_FORMAT.value.format(
task_name=flow.runs[0].task_runs.task.name,
run_name=flow.runs[0].name,
link=Constants.PREFECT_URL_FLOW.value + flow.id,
)
)

message_parts = [
self.format_flows("🚨 Flows em alerta", flows),
self.format_flows(
f"⛔ Flows desativados <@&{Constants.DISCORD_ROLE_DADOS.value}>",
flows_to_disable,
),
]

send_discord_message("\n\n".join(message_parts))

@staticmethod
def format_flows(title: str, flows: list) -> str:
if not flows:
return f"**{title}**\n_(nenhum)_"

lines = [f"**{title}**"]
for flow in flows:
link = Constants.PREFECT_URL_FLOW.value + flow.id
lines.append(
Constants.TEXT_FLOW_FORMAT.value.format(
task_name=flow.runs[0].task_runs.task.name,
run_name=flow.runs[0].name,
link=link,
)
)
return "\n".join(lines)
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# -*- coding: utf-8 -*-
from django.core.management.base import BaseCommand

from ._disable_unhealthy_flow_schedules.service import FlowService


class Command(BaseCommand):
help = "Disable unhealthy flow schedules"

def add_arguments(self, parser):
parser.add_argument(
"--dry-run",
action="store_true",
help="Log flows that would be disabled without disabling them",
)

def handle(self, *args, **options):
FlowService().disable_unhealthy_flow_schedules(dry_run=options["dry_run"])
7 changes: 7 additions & 0 deletions backend/apps/core/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,10 @@ def sync_database_with_prod():
call_command("fetch_metabase")
call_command("populate")
return "Sincronização concluída com sucesso"


@db_periodic_task(crontab(minute="*/20"))
# @production_task
def disable_unhealthy_flow_schedules():
"""Disable unhealthy flow schedules"""
call_command("disable_unhealthy_flow_schedules")
Loading
Loading