Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions dag_factory/config/job_trusted_teste.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
Head:
DocId: "teste"
Version: "v1"
StartTime: "2023-01-01 00:00:00"
Schedule: "0 9 * * *"
Retries: "Low"
RetryDelay: "Short"
Gerencia: "Front"
Coord: "RGM"
ValueStream: "Franqueado"
Tags: "factory"

Table:
ProjectId: "trusted-zone"
DatasetId: "sellout"
TableId: "tb_real_dia_cupom"

Task:
Operator: "BigQueryExecuteQueryOperator"
Sql: "CALL prc_load_tb_real_dia_cupom"

DataQuality:
Operator: "DummyOperator"

Dependencies:
- TableId: tb_loja_venda_so
PokeInterval: "Short"
60 changes: 60 additions & 0 deletions dags/generated/dag_tb_airflow_devops.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from libs.bigquery_operator import BigQueryExecuteQueryOperator
from libs.last_execution_external_task_sensor import LastExecutionExternalTaskSensor

default_args = {
'owner': 'Gerencia: Front, Coord: RGM',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1)
}

dag = DAG(
'dag_tb_airflow_devops',
start_date=datetime.strptime('2023-01-01 00:00:00', '%Y-%m-%d %H:%M:%S'),
schedule_interval='0 9 * * *',
default_args=default_args,
catchup=False,
tags=['factory']
)

with dag:


sensor_tb_loja_venda_so = LastExecutionExternalTaskSensor(
task_id='sensor_tb_loja_venda_so',
external_dag_id='dag_tb_loja_venda_so',
external_task_id='dq_tb_loja_venda_so',
poke_interval=2*60,
timeout=10*60,
failed_states=['failed', 'skipped', 'upstream_failed']
)

dq_tb_loja_venda_so = BigQueryExecuteQueryOperator(
task_id='dq_tb_loja_venda_so',
sql= 'CALL prc_data_quality',
use_legacy_sql=False,
depends_on_past=False,
priority="BATCH"
)

job_tb_airflow_devops = BigQueryExecuteQueryOperator(
task_id='job_tb_airflow_devops',
sql= 'CALL prc_load_tb_real_dia_cupom',
use_legacy_sql=False,
depends_on_past=False,
priority="BATCH"
)

dq_tb_airflow_devops = DummyOperator(
task_id='dq_tb_airflow_devops'
)

sensor_tb_loja_venda_so >> dq_tb_loja_venda_so >> job_tb_airflow_devops

job_tb_airflow_devops >> dq_tb_airflow_devops
60 changes: 60 additions & 0 deletions dags/generated/dag_teste.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from libs.bigquery_operator import BigQueryExecuteQueryOperator
from libs.last_execution_external_task_sensor import LastExecutionExternalTaskSensor

default_args = {
'owner': 'Gerencia: Front, Coord: RGM',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1)
}

dag = DAG(
'dag_teste',
start_date=datetime.strptime('2023-01-01 00:00:00', '%Y-%m-%d %H:%M:%S'),
schedule_interval='0 9 * * *',
default_args=default_args,
catchup=False,
tags=['factory']
)

with dag:


sensor_tb_loja_venda_so = LastExecutionExternalTaskSensor(
task_id='sensor_tb_loja_venda_so',
external_dag_id='dag_tb_loja_venda_so',
external_task_id='dq_tb_loja_venda_so',
poke_interval=2*60,
timeout=10*60,
failed_states=['failed', 'skipped', 'upstream_failed']
)

dq_tb_loja_venda_so = BigQueryExecuteQueryOperator(
task_id='dq_tb_loja_venda_so',
sql= 'CALL prc_data_quality',
use_legacy_sql=False,
depends_on_past=False,
priority="BATCH"
)

job_teste = BigQueryExecuteQueryOperator(
task_id='job_teste',
sql= 'CALL prc_load_tb_real_dia_cupom',
use_legacy_sql=False,
depends_on_past=False,
priority="BATCH"
)

dq_teste = DummyOperator(
task_id='dq_teste'
)

sensor_tb_loja_venda_so >> dq_tb_loja_venda_so >> job_teste

job_teste >> dq_teste