From a0e7d6db56176008d05ce7b86127101e11bb35bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Dziadosz?= Date: Thu, 16 Mar 2023 08:05:41 +0100 Subject: [PATCH 01/10] Add dbt-cloud integration command to dp cli --- data_pipelines_cli/cli.py | 2 + data_pipelines_cli/cli_commands/cloud.py | 100 +++++++++ data_pipelines_cli/dbt_cloud_api_client.py | 225 +++++++++++++++++++++ 3 files changed, 327 insertions(+) create mode 100644 data_pipelines_cli/cli_commands/cloud.py create mode 100644 data_pipelines_cli/dbt_cloud_api_client.py diff --git a/data_pipelines_cli/cli.py b/data_pipelines_cli/cli.py index c6845e9..e6e4dd7 100644 --- a/data_pipelines_cli/cli.py +++ b/data_pipelines_cli/cli.py @@ -16,6 +16,7 @@ from .cli_commands.template import list_templates_command from .cli_commands.test import test_command from .cli_commands.update import update_command +from .cli_commands.cloud import configure_cloud_command from .cli_utils import echo_error, echo_suberror from .errors import DataPipelinesError @@ -50,3 +51,4 @@ def cli() -> None: _cli.add_command(seed_command) _cli.add_command(test_command) _cli.add_command(update_command) +_cli.add_command(configure_cloud_command) diff --git a/data_pipelines_cli/cli_commands/cloud.py b/data_pipelines_cli/cli_commands/cloud.py new file mode 100644 index 0000000..5037b59 --- /dev/null +++ b/data_pipelines_cli/cli_commands/cloud.py @@ -0,0 +1,100 @@ +import json +from typing import Any, Dict, Optional + +import click + +from data_pipelines_cli.dbt_cloud_api_client import DbtCloudApiClient +from ..cli_constants import BUILD_DIR +from ..cli_utils import echo_info +from ..config_generation import read_dictionary_from_config_directory + + +def read_bigquery_config(env: str) -> Dict[str, Any]: + """ + Read Bigquery configuration. + + :param env: Name of the environment + :type env: str + :return: Compiled dictionary + :rtype: Dict[str, Any] + """ + return read_dictionary_from_config_directory(BUILD_DIR.joinpath("dag"), env, "bigquery.yml") + + +@click.command(name="configure-cloud", help="Create dbt Cloud project") +@click.option( + "--account_id", + type=int, + required=True, + help="""dbt Cloud Account identifier To find your user ID in dbt Cloud, read the following steps: + 1. Go to Account Settings, Team, and then Users, + 2. Select your user, + 3. In the address bar, the number after /users is your user ID.""", +) +@click.option( + "--token", + type=str, + required=True, + help="API token for your DBT Cloud account. " + "You can find your User API token in the Profile page under the API Access label", +) +@click.option( + "--remote_url", + type=str, + required=True, + help="Note: After creating a dbt Cloud repository's SSH key, you will need to add the generated key text as" + " a deploy key to the target repository. This gives dbt Cloud permissions to read / write in the repository." +) +@click.option( + "--project_name", + type=str, + required=False, + default="Data Pipelines Project", + help="Project Name", +) +@click.option("--keyfile", required=False, help="Bigquery keyfile") +@click.option("--env", default="local", type=str, help="Name of the environment", show_default=True) +def configure_cloud_command( + account_id: int, + token: str, + remote_url: str, + project_name: str, + keyfile: Optional[str], + env: str, +) -> None: + client = DbtCloudApiClient(f"https://cloud.getdbt.com/api", account_id, token) + + bigquery_config = read_bigquery_config(env) + + if keyfile is None and not bigquery_config.get("method") == "service-account": + echo_info("dbt Cloud requires service account") + return + + file = open(keyfile or bigquery_config["keyfile"]) + keyfile_data = json.load(file) + + project_id = client.create_project(project_name) + (repository_id, deploy_key) = client.create_repository(project_id, remote_url) + echo_info("You need to add the generated key text as a deploy key to the target repository.\n" + "This gives dbt Cloud permissions to read / write in the repository\n" + f"{deploy_key}") + + credentials_id = client.create_credentials(bigquery_config["dataset"], project_id) + client.create_development_environment(project_id, credentials_id) + connection_id = client.create_bigquery_connection( + project_id=project_id, + name="BQ Connection Name", + is_active=True, + gcp_project_id=keyfile_data["project_id"], + timeout_seconds=100, + private_key_id=keyfile_data["private_key_id"], + private_key=keyfile_data["private_key"], + client_email=keyfile_data["client_email"], + client_id=keyfile_data["client_id"], + auth_uri=keyfile_data["auth_uri"], + token_uri=keyfile_data["token_uri"], + auth_provider_x509_cert_url=keyfile_data["auth_provider_x509_cert_url"], + client_x509_cert_url=keyfile_data["client_x509_cert_url"] + ) + + client.associate_connection_repository(project_name, project_id, connection_id, repository_id) diff --git a/data_pipelines_cli/dbt_cloud_api_client.py b/data_pipelines_cli/dbt_cloud_api_client.py new file mode 100644 index 0000000..b08e254 --- /dev/null +++ b/data_pipelines_cli/dbt_cloud_api_client.py @@ -0,0 +1,225 @@ +import json +import requests + + +class DbtCloudApiClient: + """A class used to create dbt Cloud project using API v3""" + + def __init__(self, host_url, account_id, token): + self.host_url = host_url + """Base URL differs for Multi and Single-Tenant Deployments""" + + self.account_id = account_id + """ + To find your user ID in dbt Cloud, read the following steps: + 1. Go to Account Settings, Team, and then Users, + 2. Select your user, + 3. In the address bar, the number after /users is your user ID. + """ + + self.token = token + """You can find your User API token in the Profile page under the API Access label""" + + def request(self, url, data): + headers = { + "Accept": "application/json", + "Content-Type": "application/json", + "Authorization": f"Token {self.token}" + } + response = requests.post( + url=url, + data=data, + headers=headers + ) + res = json.loads(response.content) + if not res["status"]["is_success"]: + raise Exception(res["status"]["user_message"] + "\n" + res["data"]) + return res + + def create_project(self, name): + """ + Note: the dbt_project_subdirectory is an optional field which allows you to point + dbt Cloud to a subdirectory that lives within the root folder of your target repository. + This means dbt Cloud will look for a dbt_project.yml file at that location. + + :param name: Name of the project + :return: ID of created project + """ + new_project = { + "id": None, + "account_id": self.account_id, + "name": name, + "dbt_project_subdirectory": None, + "connection_id": None, + "repository_id": None + } + + new_project_data = json.dumps(new_project) + + response = self.request(f"{self.host_url}/v3/accounts/{str(self.account_id)}/projects/", new_project_data) + return response["data"]["id"] + + def create_repository(self, project_id, git_clone_url): + """ + Note: After creating a dbt Cloud repository's SSH key, you will need to add the generated key text as + a deploy key to the target repository. This gives dbt Cloud permissions to read / write in the repository. + + :param git_clone_url: Repository remote url + :param project_id: ID of the project + :return: repository ID and deploy key + """ + new_repository = { + "account_id": self.account_id, + "project_id": project_id, + "remote_url": git_clone_url, + "git_clone_strategy": "deploy_key", + "github_installation_id": None, + "token_str": None + } + + new_repository_data = json.dumps(new_repository) + + response = self.request( + f"{self.host_url}/v3/accounts/{str(self.account_id)}/projects/{str(project_id)}/repositories/", + new_repository_data) + return response["data"]["id"], response["data"]["deploy_key"]["public_key"] + + def create_development_environment(self, project_id, credentials_id): + """ + Create development environment + + :param name: Name of the project + :return: ID of created project + """ + new_env = { + "id": None, + "type": "development", + "name": "Development", + "account_id": self.account_id, + "project_id": project_id, + "state": 1, + "use_custom_branch": False, + "custom_branch": None, + "dbt_version": "1.0.0", + "supports_docs": False, + "credentials_id": credentials_id + } + + new_env_data = json.dumps(new_env) + + response = self.request( + f"{self.host_url}/v3/accounts/{str(self.account_id)}/projects/{str(project_id)}/environments/", + new_env_data) + return response["data"]["id"] + + def associate_connection_repository(self, name, project_id, connection_id=None, repository_id=None): + """ + Link connection and repository to project + + :param name: Name of the project + :param project_id: ID of the project + :param connection_id: ID of the connection to be associated + :param repository_id: ID of the repository to be associated + :return: ID of the project + """ + new_connection = { + "name": name, + "account_id": self.account_id, + "id": project_id, + "connection_id": connection_id, + "repository_id": repository_id + } + + new_connection_data = json.dumps(new_connection) + response = self.request(f"{self.host_url}/v3/accounts/{str(self.account_id)}/projects/{str(project_id)}", + new_connection_data) + + return response["data"]["id"] + + def create_credentials(self, schema, project_id): + """ + Create credentials + + :param schema: Default deployment dataset + :param project_id: ID of the project + :return: ID of created credentials + """ + new_credentials = { + "id": None, + "account_id": self.account_id, + "project_id": project_id, + "type": "bigquery", + "state": 1, + "threads": 4, + "schema": schema, + "target_name": "default", + "created_at": None, + "updated_at": None, + "username": None, + "has_refresh_token": False + } + + new_credentials_data = json.dumps(new_credentials) + response = self.request( + f"{self.host_url}/v3/accounts/{str(self.account_id)}/projects/{str(project_id)}/credentials/", + new_credentials_data) + + return response["data"]["id"] + + def create_bigquery_connection( + self, + project_id, + name, + is_active, + gcp_project_id, + timeout_seconds, + private_key_id, + private_key, + client_email, + client_id, + auth_uri, + token_uri, + auth_provider_x509_cert_url, + client_x509_cert_url, + retries=1, + location=None, + maximum_bytes_billed=0 + ): + """ + Creates dbtCloud connection to BigQuery + :param project_id: Name of the project + :param name: Name of the connection + :param is_active: should connection be active + :return: ID of the created connection + """ + connection_details = { + "project_id": gcp_project_id, + "timeout_seconds": timeout_seconds, + "private_key_id": private_key_id, + "private_key": private_key, + "client_email": client_email, + "client_id": client_id, + "auth_uri": auth_uri, + "token_uri": token_uri, + "auth_provider_x509_cert_url": auth_provider_x509_cert_url, + "client_x509_cert_url": client_x509_cert_url, + "retries": retries, + "location": location, + "maximum_bytes_billed": maximum_bytes_billed + } + + new_connection = { + "id": None, + "account_id": self.account_id, + "project_id": project_id, + "name": name, + "type": "bigquery", + "state": 1 if is_active else 0, + "details": connection_details, + } + + new_connection_data = json.dumps(new_connection).encode() + response = self.request(f"{self.host_url}/v3/accounts/{self.account_id}/projects/{project_id}/connections/", + new_connection_data) + + return response["data"]["id"] From 21132c13c275fe460e2e8f107754f522b1547768 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Dziadosz?= Date: Tue, 21 Mar 2023 08:28:28 +0100 Subject: [PATCH 02/10] Add dbt-cloud integration command to dp cli --- data_pipelines_cli/cli_commands/cloud.py | 45 ++++++++-------------- data_pipelines_cli/dbt_cloud_api_client.py | 7 ++-- 2 files changed, 20 insertions(+), 32 deletions(-) diff --git a/data_pipelines_cli/cli_commands/cloud.py b/data_pipelines_cli/cli_commands/cloud.py index 5037b59..992a1ec 100644 --- a/data_pipelines_cli/cli_commands/cloud.py +++ b/data_pipelines_cli/cli_commands/cloud.py @@ -1,24 +1,9 @@ import json -from typing import Any, Dict, Optional import click from data_pipelines_cli.dbt_cloud_api_client import DbtCloudApiClient -from ..cli_constants import BUILD_DIR from ..cli_utils import echo_info -from ..config_generation import read_dictionary_from_config_directory - - -def read_bigquery_config(env: str) -> Dict[str, Any]: - """ - Read Bigquery configuration. - - :param env: Name of the environment - :type env: str - :return: Compiled dictionary - :rtype: Dict[str, Any] - """ - return read_dictionary_from_config_directory(BUILD_DIR.joinpath("dag"), env, "bigquery.yml") @click.command(name="configure-cloud", help="Create dbt Cloud project") @@ -52,25 +37,29 @@ def read_bigquery_config(env: str) -> Dict[str, Any]: default="Data Pipelines Project", help="Project Name", ) -@click.option("--keyfile", required=False, help="Bigquery keyfile") -@click.option("--env", default="local", type=str, help="Name of the environment", show_default=True) +@click.option( + "--keyfile", + type=str, + required=True, + help="Bigquery keyfile" +) +@click.option( + "--dataset", + type=str, + required=True, + help="Name of the dataset" +) def configure_cloud_command( account_id: int, token: str, remote_url: str, project_name: str, - keyfile: Optional[str], - env: str, + keyfile: str, + dataset: str, ) -> None: client = DbtCloudApiClient(f"https://cloud.getdbt.com/api", account_id, token) - bigquery_config = read_bigquery_config(env) - - if keyfile is None and not bigquery_config.get("method") == "service-account": - echo_info("dbt Cloud requires service account") - return - - file = open(keyfile or bigquery_config["keyfile"]) + file = open(keyfile) keyfile_data = json.load(file) project_id = client.create_project(project_name) @@ -79,8 +68,8 @@ def configure_cloud_command( "This gives dbt Cloud permissions to read / write in the repository\n" f"{deploy_key}") - credentials_id = client.create_credentials(bigquery_config["dataset"], project_id) - client.create_development_environment(project_id, credentials_id) + client.create_credentials(dataset, project_id) + client.create_development_environment(project_id) connection_id = client.create_bigquery_connection( project_id=project_id, name="BQ Connection Name", diff --git a/data_pipelines_cli/dbt_cloud_api_client.py b/data_pipelines_cli/dbt_cloud_api_client.py index b08e254..fb98780 100644 --- a/data_pipelines_cli/dbt_cloud_api_client.py +++ b/data_pipelines_cli/dbt_cloud_api_client.py @@ -84,12 +84,12 @@ def create_repository(self, project_id, git_clone_url): new_repository_data) return response["data"]["id"], response["data"]["deploy_key"]["public_key"] - def create_development_environment(self, project_id, credentials_id): + def create_development_environment(self, project_id): """ Create development environment - :param name: Name of the project - :return: ID of created project + :param project_id: ID of the project + :return: ID of created environment """ new_env = { "id": None, @@ -102,7 +102,6 @@ def create_development_environment(self, project_id, credentials_id): "custom_branch": None, "dbt_version": "1.0.0", "supports_docs": False, - "credentials_id": credentials_id } new_env_data = json.dumps(new_env) From e2427e6a84c2bb981bc2a4320debe73b12b4c563 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Dziadosz?= Date: Wed, 22 Mar 2023 01:55:08 +0100 Subject: [PATCH 03/10] Add dbt-cloud integration command to dp cli --- data_pipelines_cli/cli.py | 2 +- .../cli_commands/{cloud.py => dbtcloud.py} | 11 +++++------ data_pipelines_cli/dbt_cloud_api_client.py | 4 +++- 3 files changed, 9 insertions(+), 8 deletions(-) rename data_pipelines_cli/cli_commands/{cloud.py => dbtcloud.py} (83%) diff --git a/data_pipelines_cli/cli.py b/data_pipelines_cli/cli.py index e6e4dd7..17bb6b1 100644 --- a/data_pipelines_cli/cli.py +++ b/data_pipelines_cli/cli.py @@ -16,7 +16,7 @@ from .cli_commands.template import list_templates_command from .cli_commands.test import test_command from .cli_commands.update import update_command -from .cli_commands.cloud import configure_cloud_command +from .cli_commands.dbtcloud import configure_cloud_command from .cli_utils import echo_error, echo_suberror from .errors import DataPipelinesError diff --git a/data_pipelines_cli/cli_commands/cloud.py b/data_pipelines_cli/cli_commands/dbtcloud.py similarity index 83% rename from data_pipelines_cli/cli_commands/cloud.py rename to data_pipelines_cli/cli_commands/dbtcloud.py index 992a1ec..652f9db 100644 --- a/data_pipelines_cli/cli_commands/cloud.py +++ b/data_pipelines_cli/cli_commands/dbtcloud.py @@ -11,17 +11,16 @@ "--account_id", type=int, required=True, - help="""dbt Cloud Account identifier To find your user ID in dbt Cloud, read the following steps: - 1. Go to Account Settings, Team, and then Users, - 2. Select your user, - 3. In the address bar, the number after /users is your user ID.""", + help="""dbt Cloud Account identifier To obtain your dbt Cloud account ID, sign into dbt Cloud in your browser. + Take note of the number directly following the accounts path component of the URL - this is your account ID""", ) @click.option( "--token", type=str, required=True, - help="API token for your DBT Cloud account. " - "You can find your User API token in the Profile page under the API Access label", + help="API token for your DBT Cloud account." + "You can retrieve your User API token from your User Profile (top right icon) > API Access." + "You can also use Service Token. Retrieve it from Account Settings > Service Tokens > Create Service Token.", ) @click.option( "--remote_url", diff --git a/data_pipelines_cli/dbt_cloud_api_client.py b/data_pipelines_cli/dbt_cloud_api_client.py index fb98780..323c690 100644 --- a/data_pipelines_cli/dbt_cloud_api_client.py +++ b/data_pipelines_cli/dbt_cloud_api_client.py @@ -86,7 +86,9 @@ def create_repository(self, project_id, git_clone_url): def create_development_environment(self, project_id): """ - Create development environment + Create development environment. Environments encompass a collection of settings for how you want to run + your dbt project. This includes: dbt version, git branch, data location (target schema). + :param project_id: ID of the project :return: ID of created environment From 4a4835faf007210755a463eac859765a1732f28a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Dziadosz?= Date: Mon, 3 Apr 2023 15:59:51 +0200 Subject: [PATCH 04/10] Add dbt-cloud integration command to dp cli --- data_pipelines_cli/cli_commands/dbtcloud.py | 55 ++++++++++------ data_pipelines_cli/dbt_cloud_api_client.py | 69 ++++++++++++++++++--- 2 files changed, 98 insertions(+), 26 deletions(-) diff --git a/data_pipelines_cli/cli_commands/dbtcloud.py b/data_pipelines_cli/cli_commands/dbtcloud.py index 652f9db..dadfa4b 100644 --- a/data_pipelines_cli/cli_commands/dbtcloud.py +++ b/data_pipelines_cli/cli_commands/dbtcloud.py @@ -1,9 +1,24 @@ import json import click +from typing import Any, Dict +from ..cli_constants import BUILD_DIR from data_pipelines_cli.dbt_cloud_api_client import DbtCloudApiClient from ..cli_utils import echo_info +from ..config_generation import read_dictionary_from_config_directory + + +def read_dbtcloud_config(env: str) -> Dict[str, Any]: + """ + Read dbt Cloud configuration. + + :param env: Name of the environment + :type env: str + :return: Compiled dictionary + :rtype: Dict[str, Any] + """ + return read_dictionary_from_config_directory(BUILD_DIR.joinpath("dag"), env, "dbtcloud.yml") @click.command(name="configure-cloud", help="Create dbt Cloud project") @@ -26,16 +41,10 @@ "--remote_url", type=str, required=True, - help="Note: After creating a dbt Cloud repository's SSH key, you will need to add the generated key text as" + help="Git stores remote URL" + " Note: After creating a dbt Cloud repository's SSH key, you will need to add the generated key text as" " a deploy key to the target repository. This gives dbt Cloud permissions to read / write in the repository." ) -@click.option( - "--project_name", - type=str, - required=False, - default="Data Pipelines Project", - help="Project Name", -) @click.option( "--keyfile", type=str, @@ -43,32 +52,40 @@ help="Bigquery keyfile" ) @click.option( - "--dataset", + "--env", + default="local", type=str, - required=True, - help="Name of the dataset" -) + help="Name of the environment", + show_default=True) def configure_cloud_command( account_id: int, token: str, remote_url: str, - project_name: str, keyfile: str, - dataset: str, + env: str ) -> None: client = DbtCloudApiClient(f"https://cloud.getdbt.com/api", account_id, token) + dbtcloud_config = read_dbtcloud_config(env) file = open(keyfile) keyfile_data = json.load(file) - - project_id = client.create_project(project_name) + project_id = client.create_project(dbtcloud_config["project_name"]) (repository_id, deploy_key) = client.create_repository(project_id, remote_url) echo_info("You need to add the generated key text as a deploy key to the target repository.\n" "This gives dbt Cloud permissions to read / write in the repository\n" f"{deploy_key}") - client.create_credentials(dataset, project_id) - client.create_development_environment(project_id) + for environment in dbtcloud_config["environments"]: + if environment["type"] == "deployment": + credentials_id = client.create_credentials(environment["dataset"], project_id) + else: + credentials_id = None + environment_id = client.create_environment(project_id, environment["type"], environment["name"], + environment["dbt_version"], credentials_id) + if environment["type"] == "deployment": + client.create_job(project_id, environment_id, dbtcloud_config["schedule_interval"], + "Job - " + environment["name"]) + connection_id = client.create_bigquery_connection( project_id=project_id, name="BQ Connection Name", @@ -85,4 +102,4 @@ def configure_cloud_command( client_x509_cert_url=keyfile_data["client_x509_cert_url"] ) - client.associate_connection_repository(project_name, project_id, connection_id, repository_id) + client.associate_connection_repository(dbtcloud_config["project_name"], project_id, connection_id, repository_id) diff --git a/data_pipelines_cli/dbt_cloud_api_client.py b/data_pipelines_cli/dbt_cloud_api_client.py index 323c690..ae44e36 100644 --- a/data_pipelines_cli/dbt_cloud_api_client.py +++ b/data_pipelines_cli/dbt_cloud_api_client.py @@ -84,26 +84,30 @@ def create_repository(self, project_id, git_clone_url): new_repository_data) return response["data"]["id"], response["data"]["deploy_key"]["public_key"] - def create_development_environment(self, project_id): + def create_environment(self, project_id, env_type, name, dbt_version, credentials_id=None): """ - Create development environment. Environments encompass a collection of settings for how you want to run + Create environment. Environments encompass a collection of settings for how you want to run your dbt project. This includes: dbt version, git branch, data location (target schema). - + :param name: Name of the environment + :param env_type: type of environment (development/deployment) :param project_id: ID of the project + :param credentials_id: ID of credentials to be used by environment + :param dbt_version: dbt version that should be used by this environment :return: ID of created environment """ new_env = { "id": None, - "type": "development", - "name": "Development", + "type": env_type, + "name": name, "account_id": self.account_id, "project_id": project_id, "state": 1, "use_custom_branch": False, "custom_branch": None, - "dbt_version": "1.0.0", + "dbt_version": dbt_version, "supports_docs": False, + "credentials_id": credentials_id } new_env_data = json.dumps(new_env) @@ -139,7 +143,7 @@ def associate_connection_repository(self, name, project_id, connection_id=None, def create_credentials(self, schema, project_id): """ - Create credentials + Creates credentials - these are needed to create the environment. :param schema: Default deployment dataset :param project_id: ID of the project @@ -224,3 +228,54 @@ def create_bigquery_connection( new_connection_data) return response["data"]["id"] + + def create_job(self, project_id, environment_id, schedule_cron, name): + """ + Creates sample job for given project and environment. Job is triggered by the scheduler executes commands: + dbt seed, dbt test and dbt run. + :param project_id: ID of the project + :param environment_id: ID of the environment + :param schedule_cron: Schedule (cron syntax) + :param name: Name of the job + :return: ID of created job + """ + job_details = { + "account_id": self.account_id, + "project_id": project_id, + "id": None, + "environment_id": environment_id, + "name": name, + "dbt_version": None, + "triggers": { + "schedule": True, + "github_webhook": False + }, + "execute_steps": [ + "dbt seed", + "dbt test", + "dbt run" + ], + "settings": { + "threads": 1, + "target_name": "default" + }, + "execution": { + "timeout_seconds": 600 + }, + "state": 1, + "schedule": { + "cron": schedule_cron, + "date": { + "type": "every_day" + }, + "time": { + "type": "every_hour", + "interval": 1 + } + } + } + + job_details_data = json.dumps(job_details).encode() + response = self.request(f"{self.host_url}/v2/accounts/{self.account_id}/jobs/", job_details_data) + + return response["data"]["id"] From 1ddbb0996243c6f4dac6028cc0df94b7fa10a232 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Dziadosz?= Date: Tue, 25 Apr 2023 08:48:16 +0200 Subject: [PATCH 05/10] Add dbt-cloud integration command to dp cli # Environment variable --- data_pipelines_cli/cli_commands/dbtcloud.py | 46 ++++++++++++++++----- data_pipelines_cli/dbt_cloud_api_client.py | 25 +++++++++++ 2 files changed, 61 insertions(+), 10 deletions(-) diff --git a/data_pipelines_cli/cli_commands/dbtcloud.py b/data_pipelines_cli/cli_commands/dbtcloud.py index dadfa4b..c3bfff8 100644 --- a/data_pipelines_cli/cli_commands/dbtcloud.py +++ b/data_pipelines_cli/cli_commands/dbtcloud.py @@ -18,7 +18,19 @@ def read_dbtcloud_config(env: str) -> Dict[str, Any]: :return: Compiled dictionary :rtype: Dict[str, Any] """ - return read_dictionary_from_config_directory(BUILD_DIR.joinpath("dag"), env, "dbtcloud.yml") + return read_dictionary_from_config_directory(BUILD_DIR.joinpath("dag"), ".", "dbtcloud.yml") + + +def read_bigquery_config(env: str) -> Dict[str, Any]: + """ + Read dbt Cloud configuration. + + :param env: Name of the environment + :type env: str + :return: Compiled dictionary + :rtype: Dict[str, Any] + """ + return read_dictionary_from_config_directory(BUILD_DIR.joinpath("dag"), env, "bigquery.yml") @click.command(name="configure-cloud", help="Create dbt Cloud project") @@ -75,22 +87,28 @@ def configure_cloud_command( "This gives dbt Cloud permissions to read / write in the repository\n" f"{deploy_key}") + environments_projects = {} for environment in dbtcloud_config["environments"]: - if environment["type"] == "deployment": - credentials_id = client.create_credentials(environment["dataset"], project_id) - else: - credentials_id = None - environment_id = client.create_environment(project_id, environment["type"], environment["name"], - environment["dbt_version"], credentials_id) + environment_id = create_environment(client, environment, project_id) if environment["type"] == "deployment": client.create_job(project_id, environment_id, dbtcloud_config["schedule_interval"], "Job - " + environment["name"]) + bq_config = read_bigquery_config(environment["bq_config_dir"]) + environments_projects[environment["name"]] = bq_config["project"] + + client.create_environment_variable(project_id, dbtcloud_config["default_gcp_project"], environments_projects) - connection_id = client.create_bigquery_connection( + connection_id = create_bq_connection(client, keyfile_data, project_id) + + client.associate_connection_repository(dbtcloud_config["project_name"], project_id, connection_id, repository_id) + + +def create_bq_connection(client, keyfile_data, project_id): + return client.create_bigquery_connection( project_id=project_id, name="BQ Connection Name", is_active=True, - gcp_project_id=keyfile_data["project_id"], + gcp_project_id="{{ env_var(\"DBT_GCP_PROJECT\") }} ", timeout_seconds=100, private_key_id=keyfile_data["private_key_id"], private_key=keyfile_data["private_key"], @@ -102,4 +120,12 @@ def configure_cloud_command( client_x509_cert_url=keyfile_data["client_x509_cert_url"] ) - client.associate_connection_repository(dbtcloud_config["project_name"], project_id, connection_id, repository_id) + +def create_environment(client, environment, project_id): + if environment["type"] == "deployment": + credentials_id = client.create_credentials(environment["dataset"], project_id) + else: + credentials_id = None + environment_id = client.create_environment(project_id, environment["type"], environment["name"], + environment["dbt_version"], credentials_id) + return environment_id diff --git a/data_pipelines_cli/dbt_cloud_api_client.py b/data_pipelines_cli/dbt_cloud_api_client.py index ae44e36..ae63ad4 100644 --- a/data_pipelines_cli/dbt_cloud_api_client.py +++ b/data_pipelines_cli/dbt_cloud_api_client.py @@ -117,6 +117,31 @@ def create_environment(self, project_id, env_type, name, dbt_version, credential new_env_data) return response["data"]["id"] + def create_environment_variable(self, project_id, default, environments): + """ + Create environment variable. Note: Environment variables must be prefixed with DBT_ or DBT_ENV_SECRET_ . + + :param project_id: ID of the project + :param environments: dict which contains the value of the variable for each environment + :param default: default environment variable value for project + :return: IDs of created environment variable + """ + env_var = { + "new_name": "DBT_GCP_PROJECT", + "project": default + } + env_var.update(environments) + new_env = { + "env_var": env_var + } + print(new_env) + new_env_data = json.dumps(new_env) + + response = self.request( + f"{self.host_url}/v3/accounts/{str(self.account_id)}/projects/{str(project_id)}/environment-variables/bulk/", + new_env_data) + return response["data"]["new_var_ids"] + def associate_connection_repository(self, name, project_id, connection_id=None, repository_id=None): """ Link connection and repository to project From ad56045bd7aede951852497ac39e7d540ae6afaa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Dziadosz?= Date: Tue, 25 Apr 2023 15:46:36 +0200 Subject: [PATCH 06/10] Add dbt-cloud integration command to dp cli # Code formatting --- data_pipelines_cli/cli_commands/dbtcloud.py | 57 ++++++++++++--------- data_pipelines_cli/dbt_cloud_api_client.py | 33 +++++++----- 2 files changed, 54 insertions(+), 36 deletions(-) diff --git a/data_pipelines_cli/cli_commands/dbtcloud.py b/data_pipelines_cli/cli_commands/dbtcloud.py index c3bfff8..ec6e974 100644 --- a/data_pipelines_cli/cli_commands/dbtcloud.py +++ b/data_pipelines_cli/cli_commands/dbtcloud.py @@ -1,15 +1,15 @@ import json +from typing import Any, Dict import click -from typing import Any, Dict -from ..cli_constants import BUILD_DIR from data_pipelines_cli.dbt_cloud_api_client import DbtCloudApiClient +from ..cli_constants import BUILD_DIR from ..cli_utils import echo_info from ..config_generation import read_dictionary_from_config_directory -def read_dbtcloud_config(env: str) -> Dict[str, Any]: +def read_dbtcloud_config() -> Dict[str, Any]: """ Read dbt Cloud configuration. @@ -38,24 +38,25 @@ def read_bigquery_config(env: str) -> Dict[str, Any]: "--account_id", type=int, required=True, - help="""dbt Cloud Account identifier To obtain your dbt Cloud account ID, sign into dbt Cloud in your browser. - Take note of the number directly following the accounts path component of the URL - this is your account ID""", + help="""dbt Cloud Account identifier To obtain your dbt Cloud account ID, sign into dbt Cloud + in your browser. Take note of the number directly following the accounts path component of the + URL - this is your account ID""", ) @click.option( "--token", type=str, required=True, - help="API token for your DBT Cloud account." - "You can retrieve your User API token from your User Profile (top right icon) > API Access." - "You can also use Service Token. Retrieve it from Account Settings > Service Tokens > Create Service Token.", + help="""API token for your DBT Cloud account. You can retrieve your User API token from your + User Profile (top right icon) > API Access. You can also use Service Token. Retrieve it from + Account Settings > Service Tokens > Create Service Token.""", ) @click.option( "--remote_url", type=str, required=True, - help="Git stores remote URL" - " Note: After creating a dbt Cloud repository's SSH key, you will need to add the generated key text as" - " a deploy key to the target repository. This gives dbt Cloud permissions to read / write in the repository." + help="""Git stores remote URL Note: After creating a dbt Cloud repository's SSH key, you will + need to add the generated key text as a deploy key to the target repository. This gives dbt + Cloud permissions to read / write in the repository.""" ) @click.option( "--keyfile", @@ -63,22 +64,14 @@ def read_bigquery_config(env: str) -> Dict[str, Any]: required=True, help="Bigquery keyfile" ) -@click.option( - "--env", - default="local", - type=str, - help="Name of the environment", - show_default=True) def configure_cloud_command( account_id: int, token: str, remote_url: str, - keyfile: str, - env: str -) -> None: + keyfile: str) -> None: client = DbtCloudApiClient(f"https://cloud.getdbt.com/api", account_id, token) - dbtcloud_config = read_dbtcloud_config(env) + dbtcloud_config = read_dbtcloud_config() file = open(keyfile) keyfile_data = json.load(file) project_id = client.create_project(dbtcloud_config["project_name"]) @@ -96,14 +89,24 @@ def configure_cloud_command( bq_config = read_bigquery_config(environment["bq_config_dir"]) environments_projects[environment["name"]] = bq_config["project"] - client.create_environment_variable(project_id, dbtcloud_config["default_gcp_project"], environments_projects) + client.create_environment_variable(project_id, dbtcloud_config["default_gcp_project"], + environments_projects) connection_id = create_bq_connection(client, keyfile_data, project_id) - client.associate_connection_repository(dbtcloud_config["project_name"], project_id, connection_id, repository_id) + client.associate_connection_repository(dbtcloud_config["project_name"], project_id, + connection_id, repository_id) def create_bq_connection(client, keyfile_data, project_id): + """ + Creates a connection to the bigquery warehouse in the dbt Cloud project. + + :param client: API Client + :param keyfile_data: Data read from Bigquery keyfile + :param project_id: ID of the project in which the connection is to be created + :return: ID of the created connection + """ return client.create_bigquery_connection( project_id=project_id, name="BQ Connection Name", @@ -122,6 +125,14 @@ def create_bq_connection(client, keyfile_data, project_id): def create_environment(client, environment, project_id): + """ + Creates a dbt Cloud environment with the specified configuration + + :param client: API Client + :param environment: Config of environment to be created + :param project_id: ID of the project + :return: ID of created environment + """ if environment["type"] == "deployment": credentials_id = client.create_credentials(environment["dataset"], project_id) else: diff --git a/data_pipelines_cli/dbt_cloud_api_client.py b/data_pipelines_cli/dbt_cloud_api_client.py index ae63ad4..62e3cf4 100644 --- a/data_pipelines_cli/dbt_cloud_api_client.py +++ b/data_pipelines_cli/dbt_cloud_api_client.py @@ -56,13 +56,15 @@ def create_project(self, name): new_project_data = json.dumps(new_project) - response = self.request(f"{self.host_url}/v3/accounts/{str(self.account_id)}/projects/", new_project_data) + response = self.request(f"{self.host_url}/v3/accounts/{str(self.account_id)}/projects/", + new_project_data) return response["data"]["id"] def create_repository(self, project_id, git_clone_url): """ - Note: After creating a dbt Cloud repository's SSH key, you will need to add the generated key text as - a deploy key to the target repository. This gives dbt Cloud permissions to read / write in the repository. + Note: After creating a dbt Cloud repository's SSH key, you will need to add the generated + key text as a deploy key to the target repository. This gives dbt Cloud permissions to + read / write in the repository. :param git_clone_url: Repository remote url :param project_id: ID of the project @@ -87,7 +89,7 @@ def create_repository(self, project_id, git_clone_url): def create_environment(self, project_id, env_type, name, dbt_version, credentials_id=None): """ Create environment. Environments encompass a collection of settings for how you want to run - your dbt project. This includes: dbt version, git branch, data location (target schema). + your dbt project. This includes: dbt version, git branch, data location (target schema). :param name: Name of the environment :param env_type: type of environment (development/deployment) @@ -119,7 +121,8 @@ def create_environment(self, project_id, env_type, name, dbt_version, credential def create_environment_variable(self, project_id, default, environments): """ - Create environment variable. Note: Environment variables must be prefixed with DBT_ or DBT_ENV_SECRET_ . + Create environment variable. Note: Environment variables must be prefixed with DBT_ or + DBT_ENV_SECRET_ . :param project_id: ID of the project :param environments: dict which contains the value of the variable for each environment @@ -142,7 +145,8 @@ def create_environment_variable(self, project_id, default, environments): new_env_data) return response["data"]["new_var_ids"] - def associate_connection_repository(self, name, project_id, connection_id=None, repository_id=None): + def associate_connection_repository(self, name, project_id, connection_id=None, + repository_id=None): """ Link connection and repository to project @@ -161,8 +165,9 @@ def associate_connection_repository(self, name, project_id, connection_id=None, } new_connection_data = json.dumps(new_connection) - response = self.request(f"{self.host_url}/v3/accounts/{str(self.account_id)}/projects/{str(project_id)}", - new_connection_data) + response = self.request( + f"{self.host_url}/v3/accounts/{str(self.account_id)}/projects/{str(project_id)}", + new_connection_data) return response["data"]["id"] @@ -249,15 +254,16 @@ def create_bigquery_connection( } new_connection_data = json.dumps(new_connection).encode() - response = self.request(f"{self.host_url}/v3/accounts/{self.account_id}/projects/{project_id}/connections/", - new_connection_data) + response = self.request( + f"{self.host_url}/v3/accounts/{self.account_id}/projects/{project_id}/connections/", + new_connection_data) return response["data"]["id"] def create_job(self, project_id, environment_id, schedule_cron, name): """ - Creates sample job for given project and environment. Job is triggered by the scheduler executes commands: - dbt seed, dbt test and dbt run. + Creates sample job for given project and environment. Job is triggered by the scheduler + executes commands: dbt seed, dbt test and dbt run. :param project_id: ID of the project :param environment_id: ID of the environment :param schedule_cron: Schedule (cron syntax) @@ -301,6 +307,7 @@ def create_job(self, project_id, environment_id, schedule_cron, name): } job_details_data = json.dumps(job_details).encode() - response = self.request(f"{self.host_url}/v2/accounts/{self.account_id}/jobs/", job_details_data) + response = self.request(f"{self.host_url}/v2/accounts/{self.account_id}/jobs/", + job_details_data) return response["data"]["id"] From 858485594bde4cb7998e5b9c6180acbca2792f21 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Dziadosz?= Date: Tue, 25 Apr 2023 16:51:18 +0200 Subject: [PATCH 07/10] Add dbt-cloud integration command to dp cli # Documentation --- data_pipelines_cli/cli_commands/dbtcloud.py | 2 +- docs/configuration.rst | 68 ++++++++++++++++++++- docs/integration.rst | 10 +++ 3 files changed, 78 insertions(+), 2 deletions(-) diff --git a/data_pipelines_cli/cli_commands/dbtcloud.py b/data_pipelines_cli/cli_commands/dbtcloud.py index ec6e974..233e723 100644 --- a/data_pipelines_cli/cli_commands/dbtcloud.py +++ b/data_pipelines_cli/cli_commands/dbtcloud.py @@ -38,7 +38,7 @@ def read_bigquery_config(env: str) -> Dict[str, Any]: "--account_id", type=int, required=True, - help="""dbt Cloud Account identifier To obtain your dbt Cloud account ID, sign into dbt Cloud + help="""dbt Cloud Account identifier To obtain your dbt Cloud account ID, sign into dbt Cloud in your browser. Take note of the number directly following the accounts path component of the URL - this is your account ID""", ) diff --git a/docs/configuration.rst b/docs/configuration.rst index 4813689..72c2160 100644 --- a/docs/configuration.rst +++ b/docs/configuration.rst @@ -375,4 +375,70 @@ Example ``looker.yml`` file might look like this: looker_project_id: my_looker_project looker_webhook_secret: "{{ env_var('LOOKER_WEBHOOK_SECRET') }}" looker_repository_branch: main - looker_instance_url: https://looker.company.com/ \ No newline at end of file + looker_instance_url: https://looker.company.com/ + +dbt Cloud configuration +++++++++++++++++++++++++++++++ + +``config/dbtcloud.yml`` contains configuration related to dbt Cloud: + +.. list-table:: + :widths: 25 20 55 + :header-rows: 1 + + * - Parameter + - Data type + - Description + * - project_name + - string + - Name of the project to be created in dbt Cloud + * - schedule_interval + - string + - The cron expression with which the example job will be run + * - default_gcp_project + - string + - GCP project that will be used by default if a new environment is created in dbt Cloud + * - environments + - Array + - Details of the environments to be created in dbt Cloud + +Configuration of the environments: + +.. list-table:: + :widths: 25 20 55 + :header-rows: 1 + + * - Parameter + - Data type + - Description + * - name + - string + - Name of the environment that will be created in dbt Cloud + * - dataset + - string + - Target dataset for this environment + * - dbt_version + - string + - The dbt version used in this environment + * - bq_config_dir + - string + - The name of the dp env directory where the bigquery configuration for the environment is located. The name of the project in GCP will be read from it. + +Example ``dbtcloud.yml`` file might look like this: + +.. code-block:: yaml + + project_name: "Data Pipelines Project" + schedule_interval: "0 12 * * *" + default_gcp_project: "default-project" + environments: + - name: "Develop" + dataset: "dev" + dbt_version: "1.0.0" + type: "development" + bq_config_dir: "dev" + - name: "Production" + dataset: "prod" + dbt_version: "1.0.0" + type: "deployment" + bq_config_dir: "prod" diff --git a/docs/integration.rst b/docs/integration.rst index c961b31..0a77659 100644 --- a/docs/integration.rst +++ b/docs/integration.rst @@ -71,3 +71,13 @@ Looker **dp** can generate lookML codes for your models and views, publish and deploy your `Looker `_ project +dbt Cloud +++++++++++++++++++++++++++++++++++++++++++++++ + +The `Data Pipelines CLI` can configure a project in dbt Cloud. The following functions are supported: + +- creation of a project +- adding a repository +- adding a connection to BigQuery +- creation of environments +- creation of sample jobs From 464321c25724aa59fd9f40569530e9ceacc2aef1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Dziadosz?= Date: Thu, 25 May 2023 13:18:27 +0200 Subject: [PATCH 08/10] Add dbt-cloud integration command to dp cli # CR --- data_pipelines_cli/cli_commands/dbtcloud.py | 55 +++++++++++++++------ data_pipelines_cli/cli_commands/publish.py | 4 +- data_pipelines_cli/dbt_cloud_api_client.py | 11 +++-- docs/configuration.rst | 26 ++++------ setup.py | 2 +- 5 files changed, 60 insertions(+), 38 deletions(-) diff --git a/data_pipelines_cli/cli_commands/dbtcloud.py b/data_pipelines_cli/cli_commands/dbtcloud.py index 233e723..d52b885 100644 --- a/data_pipelines_cli/cli_commands/dbtcloud.py +++ b/data_pipelines_cli/cli_commands/dbtcloud.py @@ -1,4 +1,5 @@ import json +import sys from typing import Any, Dict import click @@ -6,7 +7,8 @@ from data_pipelines_cli.dbt_cloud_api_client import DbtCloudApiClient from ..cli_constants import BUILD_DIR from ..cli_utils import echo_info -from ..config_generation import read_dictionary_from_config_directory +from ..config_generation import read_dictionary_from_config_directory, generate_profiles_yml +from ..dbt_utils import _dump_dbt_vars_from_configs_to_string, run_dbt_command def read_dbtcloud_config() -> Dict[str, Any]: @@ -33,6 +35,26 @@ def read_bigquery_config(env: str) -> Dict[str, Any]: return read_dictionary_from_config_directory(BUILD_DIR.joinpath("dag"), env, "bigquery.yml") +def resolve_env_var(unresolved_text, env): + """ + Resolves environment variables and jinja in the given text using the dbt show command. + :param unresolved_text: Text to be resolved + :param env: Environment dir name + :return: Parsed text + """ + profiles_path = generate_profiles_yml(env, False) + dbt_command_result_bytes = run_dbt_command( + ("show", "--inline", f"SELECT '{unresolved_text}' AS parsed", "--output", "json"), + env, + profiles_path, + log_format_json=True, + capture_output=True) + decoded_output = dbt_command_result_bytes.stdout.decode(encoding=sys.stdout.encoding or "utf-8") + for line in map(json.loads, decoded_output.splitlines()): + if line.get('data', {}).get('node_name') == "inline_query": + return json.loads(line["data"]["preview"])[0]["parsed"] + + @click.command(name="configure-cloud", help="Create dbt Cloud project") @click.option( "--account_id", @@ -72,29 +94,33 @@ def configure_cloud_command( client = DbtCloudApiClient(f"https://cloud.getdbt.com/api", account_id, token) dbtcloud_config = read_dbtcloud_config() + base_bq_config = read_bigquery_config("base") file = open(keyfile) keyfile_data = json.load(file) - project_id = client.create_project(dbtcloud_config["project_name"]) - (repository_id, deploy_key) = client.create_repository(project_id, remote_url) + dbtcloud_project_id = client.create_project(dbtcloud_config["project_name"]) + (repository_id, deploy_key) = client.create_repository(dbtcloud_project_id, remote_url) echo_info("You need to add the generated key text as a deploy key to the target repository.\n" "This gives dbt Cloud permissions to read / write in the repository\n" f"{deploy_key}") environments_projects = {} for environment in dbtcloud_config["environments"]: - environment_id = create_environment(client, environment, project_id) + bq_config = read_bigquery_config(environment["config_dir"]) + environments_projects[environment["name"]] = resolve_env_var(bq_config["project"], + environment["config_dir"]) + environment_id = create_environment(client, environment, bq_config["dataset"], + dbtcloud_project_id) if environment["type"] == "deployment": - client.create_job(project_id, environment_id, dbtcloud_config["schedule_interval"], - "Job - " + environment["name"]) - bq_config = read_bigquery_config(environment["bq_config_dir"]) - environments_projects[environment["name"]] = bq_config["project"] + dbt_vars = _dump_dbt_vars_from_configs_to_string(environment["config_dir"]).strip() + client.create_job(dbtcloud_project_id, environment_id, environment["schedule_interval"], + "Job - " + environment["name"], dbt_vars) - client.create_environment_variable(project_id, dbtcloud_config["default_gcp_project"], + client.create_environment_variable(dbtcloud_project_id, base_bq_config["project"], environments_projects) - connection_id = create_bq_connection(client, keyfile_data, project_id) + connection_id = create_bq_connection(client, keyfile_data, dbtcloud_project_id) - client.associate_connection_repository(dbtcloud_config["project_name"], project_id, + client.associate_connection_repository(dbtcloud_config["project_name"], dbtcloud_project_id, connection_id, repository_id) @@ -111,7 +137,7 @@ def create_bq_connection(client, keyfile_data, project_id): project_id=project_id, name="BQ Connection Name", is_active=True, - gcp_project_id="{{ env_var(\"DBT_GCP_PROJECT\") }} ", + gcp_project_id="{{ env_var(\"DBT_GCP_PROJECT\") }}", timeout_seconds=100, private_key_id=keyfile_data["private_key_id"], private_key=keyfile_data["private_key"], @@ -124,17 +150,18 @@ def create_bq_connection(client, keyfile_data, project_id): ) -def create_environment(client, environment, project_id): +def create_environment(client, environment, dataset, project_id): """ Creates a dbt Cloud environment with the specified configuration :param client: API Client :param environment: Config of environment to be created :param project_id: ID of the project + :param dataset: Name of target dataset :return: ID of created environment """ if environment["type"] == "deployment": - credentials_id = client.create_credentials(environment["dataset"], project_id) + credentials_id = client.create_credentials(dataset, project_id) else: credentials_id = None environment_id = client.create_environment(project_id, environment["type"], environment["name"], diff --git a/data_pipelines_cli/cli_commands/publish.py b/data_pipelines_cli/cli_commands/publish.py index cc0745a..984b245 100644 --- a/data_pipelines_cli/cli_commands/publish.py +++ b/data_pipelines_cli/cli_commands/publish.py @@ -5,9 +5,9 @@ import click import yaml -from dbt.contracts.graph.compiled import ManifestNode from dbt.contracts.graph.manifest import Manifest -from dbt.contracts.graph.parsed import ColumnInfo +from dbt.contracts.graph.nodes import ColumnInfo +from dbt.contracts.graph.nodes import ManifestNode from ..cli_constants import BUILD_DIR from ..cli_utils import echo_info, echo_warning diff --git a/data_pipelines_cli/dbt_cloud_api_client.py b/data_pipelines_cli/dbt_cloud_api_client.py index 62e3cf4..729e5ee 100644 --- a/data_pipelines_cli/dbt_cloud_api_client.py +++ b/data_pipelines_cli/dbt_cloud_api_client.py @@ -137,7 +137,6 @@ def create_environment_variable(self, project_id, default, environments): new_env = { "env_var": env_var } - print(new_env) new_env_data = json.dumps(new_env) response = self.request( @@ -260,7 +259,7 @@ def create_bigquery_connection( return response["data"]["id"] - def create_job(self, project_id, environment_id, schedule_cron, name): + def create_job(self, project_id, environment_id, schedule_cron, name, vars): """ Creates sample job for given project and environment. Job is triggered by the scheduler executes commands: dbt seed, dbt test and dbt run. @@ -268,8 +267,10 @@ def create_job(self, project_id, environment_id, schedule_cron, name): :param environment_id: ID of the environment :param schedule_cron: Schedule (cron syntax) :param name: Name of the job + :param vars: Variables passed to commands :return: ID of created job """ + job_details = { "account_id": self.account_id, "project_id": project_id, @@ -282,9 +283,9 @@ def create_job(self, project_id, environment_id, schedule_cron, name): "github_webhook": False }, "execute_steps": [ - "dbt seed", - "dbt test", - "dbt run" + "dbt seed --vars '" + vars + "'", + "dbt run --vars '" + vars + "'", + "dbt test --vars '" + vars + "'" ], "settings": { "threads": 1, diff --git a/docs/configuration.rst b/docs/configuration.rst index 72c2160..647b2eb 100644 --- a/docs/configuration.rst +++ b/docs/configuration.rst @@ -392,12 +392,6 @@ dbt Cloud configuration * - project_name - string - Name of the project to be created in dbt Cloud - * - schedule_interval - - string - - The cron expression with which the example job will be run - * - default_gcp_project - - string - - GCP project that will be used by default if a new environment is created in dbt Cloud * - environments - Array - Details of the environments to be created in dbt Cloud @@ -414,31 +408,31 @@ Configuration of the environments: * - name - string - Name of the environment that will be created in dbt Cloud - * - dataset + * - type - string - - Target dataset for this environment + - In dbt Cloud, there are two types of environments: deployment and development. Deployment environments determine the settings used when jobs created within that environment are executed. Development environments determine the settings used in the dbt Cloud IDE for that particular dbt Cloud Project. Each dbt Cloud project can only have a single development environment but can have any number of deployment environments. * - dbt_version - string - The dbt version used in this environment - * - bq_config_dir + * - schedule_interval + - string + - The cron expression with which the example job will be run. This setting is only needed for the deployment environment. + * - config_dir - string - - The name of the dp env directory where the bigquery configuration for the environment is located. The name of the project in GCP will be read from it. + - The name of the dp env directory where the bigquery configuration for the environment is located. The name of the project in GCP and target dataset will be read from it. Example ``dbtcloud.yml`` file might look like this: .. code-block:: yaml project_name: "Data Pipelines Project" - schedule_interval: "0 12 * * *" - default_gcp_project: "default-project" environments: - name: "Develop" - dataset: "dev" dbt_version: "1.0.0" type: "development" - bq_config_dir: "dev" + config_dir: "dev" - name: "Production" - dataset: "prod" dbt_version: "1.0.0" type: "deployment" - bq_config_dir: "prod" + config_dir: "prod" + schedule_interval: "0 12 * * *" diff --git a/setup.py b/setup.py index 6eeec8b..7415fb3 100644 --- a/setup.py +++ b/setup.py @@ -16,7 +16,7 @@ "fsspec==2022.11.0", "packaging==21.3", "colorama==0.4.5", - "dbt-core==1.3.1", + "dbt-core==1.5.0", ] EXTRA_FILESYSTEMS_REQUIRE = { From ec3f647280d44aaf28b19bd3e985d9dfd605f8a5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Dziadosz?= Date: Thu, 25 May 2023 23:43:21 +0200 Subject: [PATCH 09/10] Add dbt-cloud integration command to dp cli # remove resolving jinja / env vars --- data_pipelines_cli/cli_commands/dbtcloud.py | 28 +++------------------ data_pipelines_cli/cli_commands/publish.py | 4 +-- setup.py | 2 +- 3 files changed, 6 insertions(+), 28 deletions(-) diff --git a/data_pipelines_cli/cli_commands/dbtcloud.py b/data_pipelines_cli/cli_commands/dbtcloud.py index d52b885..c83a809 100644 --- a/data_pipelines_cli/cli_commands/dbtcloud.py +++ b/data_pipelines_cli/cli_commands/dbtcloud.py @@ -1,5 +1,4 @@ import json -import sys from typing import Any, Dict import click @@ -7,8 +6,8 @@ from data_pipelines_cli.dbt_cloud_api_client import DbtCloudApiClient from ..cli_constants import BUILD_DIR from ..cli_utils import echo_info -from ..config_generation import read_dictionary_from_config_directory, generate_profiles_yml -from ..dbt_utils import _dump_dbt_vars_from_configs_to_string, run_dbt_command +from ..config_generation import read_dictionary_from_config_directory +from ..dbt_utils import _dump_dbt_vars_from_configs_to_string def read_dbtcloud_config() -> Dict[str, Any]: @@ -35,26 +34,6 @@ def read_bigquery_config(env: str) -> Dict[str, Any]: return read_dictionary_from_config_directory(BUILD_DIR.joinpath("dag"), env, "bigquery.yml") -def resolve_env_var(unresolved_text, env): - """ - Resolves environment variables and jinja in the given text using the dbt show command. - :param unresolved_text: Text to be resolved - :param env: Environment dir name - :return: Parsed text - """ - profiles_path = generate_profiles_yml(env, False) - dbt_command_result_bytes = run_dbt_command( - ("show", "--inline", f"SELECT '{unresolved_text}' AS parsed", "--output", "json"), - env, - profiles_path, - log_format_json=True, - capture_output=True) - decoded_output = dbt_command_result_bytes.stdout.decode(encoding=sys.stdout.encoding or "utf-8") - for line in map(json.loads, decoded_output.splitlines()): - if line.get('data', {}).get('node_name') == "inline_query": - return json.loads(line["data"]["preview"])[0]["parsed"] - - @click.command(name="configure-cloud", help="Create dbt Cloud project") @click.option( "--account_id", @@ -106,8 +85,7 @@ def configure_cloud_command( environments_projects = {} for environment in dbtcloud_config["environments"]: bq_config = read_bigquery_config(environment["config_dir"]) - environments_projects[environment["name"]] = resolve_env_var(bq_config["project"], - environment["config_dir"]) + environments_projects[environment["name"]] = bq_config["project"] environment_id = create_environment(client, environment, bq_config["dataset"], dbtcloud_project_id) if environment["type"] == "deployment": diff --git a/data_pipelines_cli/cli_commands/publish.py b/data_pipelines_cli/cli_commands/publish.py index 984b245..cc0745a 100644 --- a/data_pipelines_cli/cli_commands/publish.py +++ b/data_pipelines_cli/cli_commands/publish.py @@ -5,9 +5,9 @@ import click import yaml +from dbt.contracts.graph.compiled import ManifestNode from dbt.contracts.graph.manifest import Manifest -from dbt.contracts.graph.nodes import ColumnInfo -from dbt.contracts.graph.nodes import ManifestNode +from dbt.contracts.graph.parsed import ColumnInfo from ..cli_constants import BUILD_DIR from ..cli_utils import echo_info, echo_warning diff --git a/setup.py b/setup.py index 7415fb3..6eeec8b 100644 --- a/setup.py +++ b/setup.py @@ -16,7 +16,7 @@ "fsspec==2022.11.0", "packaging==21.3", "colorama==0.4.5", - "dbt-core==1.5.0", + "dbt-core==1.3.1", ] EXTRA_FILESYSTEMS_REQUIRE = { From becddfe9439929a5456f3158206750977ffae206 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Dziadosz?= Date: Sun, 18 Jun 2023 21:50:58 +0200 Subject: [PATCH 10/10] Add dbt-cloud integration command to dp cli # code formatting --- data_pipelines_cli/cli.py | 2 +- data_pipelines_cli/cli_commands/dbtcloud.py | 73 +++++---- data_pipelines_cli/dbt_cloud_api_client.py | 171 ++++++++++---------- 3 files changed, 131 insertions(+), 115 deletions(-) diff --git a/data_pipelines_cli/cli.py b/data_pipelines_cli/cli.py index 17bb6b1..799b03d 100644 --- a/data_pipelines_cli/cli.py +++ b/data_pipelines_cli/cli.py @@ -5,6 +5,7 @@ from .cli_commands.clean import clean_command from .cli_commands.compile import compile_project_command from .cli_commands.create import create_command +from .cli_commands.dbtcloud import configure_cloud_command from .cli_commands.deploy import deploy_command from .cli_commands.docs import docs_command from .cli_commands.generate.generate import generate_group @@ -16,7 +17,6 @@ from .cli_commands.template import list_templates_command from .cli_commands.test import test_command from .cli_commands.update import update_command -from .cli_commands.dbtcloud import configure_cloud_command from .cli_utils import echo_error, echo_suberror from .errors import DataPipelinesError diff --git a/data_pipelines_cli/cli_commands/dbtcloud.py b/data_pipelines_cli/cli_commands/dbtcloud.py index c83a809..92f3a60 100644 --- a/data_pipelines_cli/cli_commands/dbtcloud.py +++ b/data_pipelines_cli/cli_commands/dbtcloud.py @@ -4,6 +4,7 @@ import click from data_pipelines_cli.dbt_cloud_api_client import DbtCloudApiClient + from ..cli_constants import BUILD_DIR from ..cli_utils import echo_info from ..config_generation import read_dictionary_from_config_directory @@ -57,20 +58,11 @@ def read_bigquery_config(env: str) -> Dict[str, Any]: required=True, help="""Git stores remote URL Note: After creating a dbt Cloud repository's SSH key, you will need to add the generated key text as a deploy key to the target repository. This gives dbt - Cloud permissions to read / write in the repository.""" -) -@click.option( - "--keyfile", - type=str, - required=True, - help="Bigquery keyfile" + Cloud permissions to read / write in the repository.""", ) -def configure_cloud_command( - account_id: int, - token: str, - remote_url: str, - keyfile: str) -> None: - client = DbtCloudApiClient(f"https://cloud.getdbt.com/api", account_id, token) +@click.option("--keyfile", type=str, required=True, help="Bigquery keyfile") +def configure_cloud_command(account_id: int, token: str, remote_url: str, keyfile: str) -> None: + client = DbtCloudApiClient("https://cloud.getdbt.com/api", account_id, token) dbtcloud_config = read_dbtcloud_config() base_bq_config = read_bigquery_config("base") @@ -78,31 +70,43 @@ def configure_cloud_command( keyfile_data = json.load(file) dbtcloud_project_id = client.create_project(dbtcloud_config["project_name"]) (repository_id, deploy_key) = client.create_repository(dbtcloud_project_id, remote_url) - echo_info("You need to add the generated key text as a deploy key to the target repository.\n" - "This gives dbt Cloud permissions to read / write in the repository\n" - f"{deploy_key}") + echo_info( + "You need to add the generated key text as a deploy key to the target repository.\n" + "This gives dbt Cloud permissions to read / write in the repository\n" + f"{deploy_key}" + ) environments_projects = {} for environment in dbtcloud_config["environments"]: bq_config = read_bigquery_config(environment["config_dir"]) environments_projects[environment["name"]] = bq_config["project"] - environment_id = create_environment(client, environment, bq_config["dataset"], - dbtcloud_project_id) + environment_id = create_environment( + client, environment, bq_config["dataset"], dbtcloud_project_id + ) if environment["type"] == "deployment": dbt_vars = _dump_dbt_vars_from_configs_to_string(environment["config_dir"]).strip() - client.create_job(dbtcloud_project_id, environment_id, environment["schedule_interval"], - "Job - " + environment["name"], dbt_vars) - - client.create_environment_variable(dbtcloud_project_id, base_bq_config["project"], - environments_projects) + client.create_job( + dbtcloud_project_id, + environment_id, + environment["schedule_interval"], + "Job - " + environment["name"], + dbt_vars, + ) + + client.create_environment_variable( + dbtcloud_project_id, base_bq_config["project"], environments_projects + ) connection_id = create_bq_connection(client, keyfile_data, dbtcloud_project_id) - client.associate_connection_repository(dbtcloud_config["project_name"], dbtcloud_project_id, - connection_id, repository_id) + client.associate_connection_repository( + dbtcloud_config["project_name"], dbtcloud_project_id, connection_id, repository_id + ) -def create_bq_connection(client, keyfile_data, project_id): +def create_bq_connection( + client: DbtCloudApiClient, keyfile_data: Dict[str, str], project_id: int +) -> int: """ Creates a connection to the bigquery warehouse in the dbt Cloud project. @@ -115,7 +119,7 @@ def create_bq_connection(client, keyfile_data, project_id): project_id=project_id, name="BQ Connection Name", is_active=True, - gcp_project_id="{{ env_var(\"DBT_GCP_PROJECT\") }}", + gcp_project_id='{{ env_var("DBT_GCP_PROJECT") }}', timeout_seconds=100, private_key_id=keyfile_data["private_key_id"], private_key=keyfile_data["private_key"], @@ -124,11 +128,13 @@ def create_bq_connection(client, keyfile_data, project_id): auth_uri=keyfile_data["auth_uri"], token_uri=keyfile_data["token_uri"], auth_provider_x509_cert_url=keyfile_data["auth_provider_x509_cert_url"], - client_x509_cert_url=keyfile_data["client_x509_cert_url"] + client_x509_cert_url=keyfile_data["client_x509_cert_url"], ) -def create_environment(client, environment, dataset, project_id): +def create_environment( + client: DbtCloudApiClient, environment: Dict[str, str], dataset: str, project_id: int +) -> int: """ Creates a dbt Cloud environment with the specified configuration @@ -142,6 +148,11 @@ def create_environment(client, environment, dataset, project_id): credentials_id = client.create_credentials(dataset, project_id) else: credentials_id = None - environment_id = client.create_environment(project_id, environment["type"], environment["name"], - environment["dbt_version"], credentials_id) + environment_id = client.create_environment( + project_id, + environment["type"], + environment["name"], + environment["dbt_version"], + credentials_id, + ) return environment_id diff --git a/data_pipelines_cli/dbt_cloud_api_client.py b/data_pipelines_cli/dbt_cloud_api_client.py index 729e5ee..4354240 100644 --- a/data_pipelines_cli/dbt_cloud_api_client.py +++ b/data_pipelines_cli/dbt_cloud_api_client.py @@ -1,11 +1,13 @@ import json +from typing import Any, Dict, Optional, Tuple + import requests class DbtCloudApiClient: """A class used to create dbt Cloud project using API v3""" - def __init__(self, host_url, account_id, token): + def __init__(self, host_url: str, account_id: int, token: str) -> None: self.host_url = host_url """Base URL differs for Multi and Single-Tenant Deployments""" @@ -17,26 +19,25 @@ def __init__(self, host_url, account_id, token): 3. In the address bar, the number after /users is your user ID. """ + self.api_v3_url = f"{self.host_url}/v3/accounts/{str(self.account_id)}/projects" + """Url of API v3 (used for managing resources in dbt Cloud)""" + self.token = token """You can find your User API token in the Profile page under the API Access label""" - def request(self, url, data): + def request(self, url: str, data: Any) -> Any: headers = { "Accept": "application/json", "Content-Type": "application/json", - "Authorization": f"Token {self.token}" + "Authorization": f"Token {self.token}", } - response = requests.post( - url=url, - data=data, - headers=headers - ) + response = requests.post(url=url, data=data, headers=headers) res = json.loads(response.content) if not res["status"]["is_success"]: raise Exception(res["status"]["user_message"] + "\n" + res["data"]) return res - def create_project(self, name): + def create_project(self, name: str) -> int: """ Note: the dbt_project_subdirectory is an optional field which allows you to point dbt Cloud to a subdirectory that lives within the root folder of your target repository. @@ -51,16 +52,15 @@ def create_project(self, name): "name": name, "dbt_project_subdirectory": None, "connection_id": None, - "repository_id": None + "repository_id": None, } new_project_data = json.dumps(new_project) - response = self.request(f"{self.host_url}/v3/accounts/{str(self.account_id)}/projects/", - new_project_data) + response = self.request(f"{self.api_v3_url}/", new_project_data) return response["data"]["id"] - def create_repository(self, project_id, git_clone_url): + def create_repository(self, project_id: int, git_clone_url: str) -> Tuple[int, str]: """ Note: After creating a dbt Cloud repository's SSH key, you will need to add the generated key text as a deploy key to the target repository. This gives dbt Cloud permissions to @@ -76,17 +76,25 @@ def create_repository(self, project_id, git_clone_url): "remote_url": git_clone_url, "git_clone_strategy": "deploy_key", "github_installation_id": None, - "token_str": None + "token_str": None, } new_repository_data = json.dumps(new_repository) response = self.request( - f"{self.host_url}/v3/accounts/{str(self.account_id)}/projects/{str(project_id)}/repositories/", - new_repository_data) + f"{self.api_v3_url}/{str(project_id)}/repositories/", + new_repository_data, + ) return response["data"]["id"], response["data"]["deploy_key"]["public_key"] - def create_environment(self, project_id, env_type, name, dbt_version, credentials_id=None): + def create_environment( + self, + project_id: int, + env_type: str, + name: str, + dbt_version: str, + credentials_id: Optional[int] = None, + ) -> int: """ Create environment. Environments encompass a collection of settings for how you want to run your dbt project. This includes: dbt version, git branch, data location (target schema). @@ -109,17 +117,20 @@ def create_environment(self, project_id, env_type, name, dbt_version, credential "custom_branch": None, "dbt_version": dbt_version, "supports_docs": False, - "credentials_id": credentials_id + "credentials_id": credentials_id, } new_env_data = json.dumps(new_env) response = self.request( - f"{self.host_url}/v3/accounts/{str(self.account_id)}/projects/{str(project_id)}/environments/", - new_env_data) + f"{self.api_v3_url}/{str(project_id)}/environments/", + new_env_data, + ) return response["data"]["id"] - def create_environment_variable(self, project_id, default, environments): + def create_environment_variable( + self, project_id: int, default: str, environments: Dict[str, str] + ) -> int: """ Create environment variable. Note: Environment variables must be prefixed with DBT_ or DBT_ENV_SECRET_ . @@ -129,23 +140,24 @@ def create_environment_variable(self, project_id, default, environments): :param default: default environment variable value for project :return: IDs of created environment variable """ - env_var = { - "new_name": "DBT_GCP_PROJECT", - "project": default - } + env_var = {"new_name": "DBT_GCP_PROJECT", "project": default} env_var.update(environments) - new_env = { - "env_var": env_var - } + new_env = {"env_var": env_var} new_env_data = json.dumps(new_env) response = self.request( - f"{self.host_url}/v3/accounts/{str(self.account_id)}/projects/{str(project_id)}/environment-variables/bulk/", - new_env_data) + f"{self.api_v3_url}/{str(project_id)}/environment-variables/bulk/", + new_env_data, + ) return response["data"]["new_var_ids"] - def associate_connection_repository(self, name, project_id, connection_id=None, - repository_id=None): + def associate_connection_repository( + self, + name: str, + project_id: int, + connection_id: Optional[int] = None, + repository_id: Optional[int] = None, + ) -> int: """ Link connection and repository to project @@ -160,17 +172,18 @@ def associate_connection_repository(self, name, project_id, connection_id=None, "account_id": self.account_id, "id": project_id, "connection_id": connection_id, - "repository_id": repository_id + "repository_id": repository_id, } new_connection_data = json.dumps(new_connection) response = self.request( - f"{self.host_url}/v3/accounts/{str(self.account_id)}/projects/{str(project_id)}", - new_connection_data) + f"{self.api_v3_url}/{str(project_id)}", + new_connection_data, + ) return response["data"]["id"] - def create_credentials(self, schema, project_id): + def create_credentials(self, schema: str, project_id: int) -> int: """ Creates credentials - these are needed to create the environment. @@ -190,35 +203,36 @@ def create_credentials(self, schema, project_id): "created_at": None, "updated_at": None, "username": None, - "has_refresh_token": False + "has_refresh_token": False, } new_credentials_data = json.dumps(new_credentials) response = self.request( - f"{self.host_url}/v3/accounts/{str(self.account_id)}/projects/{str(project_id)}/credentials/", - new_credentials_data) + f"{self.api_v3_url}/{str(project_id)}/credentials/", + new_credentials_data, + ) return response["data"]["id"] def create_bigquery_connection( - self, - project_id, - name, - is_active, - gcp_project_id, - timeout_seconds, - private_key_id, - private_key, - client_email, - client_id, - auth_uri, - token_uri, - auth_provider_x509_cert_url, - client_x509_cert_url, - retries=1, - location=None, - maximum_bytes_billed=0 - ): + self, + project_id: int, + name: str, + is_active: bool, + gcp_project_id: str, + timeout_seconds: int, + private_key_id: str, + private_key: str, + client_email: str, + client_id: str, + auth_uri: str, + token_uri: str, + auth_provider_x509_cert_url: str, + client_x509_cert_url: str, + retries: int = 1, + location: Optional[str] = None, + maximum_bytes_billed: int = 0, + ) -> int: """ Creates dbtCloud connection to BigQuery :param project_id: Name of the project @@ -239,7 +253,7 @@ def create_bigquery_connection( "client_x509_cert_url": client_x509_cert_url, "retries": retries, "location": location, - "maximum_bytes_billed": maximum_bytes_billed + "maximum_bytes_billed": maximum_bytes_billed, } new_connection = { @@ -254,12 +268,15 @@ def create_bigquery_connection( new_connection_data = json.dumps(new_connection).encode() response = self.request( - f"{self.host_url}/v3/accounts/{self.account_id}/projects/{project_id}/connections/", - new_connection_data) + f"{self.api_v3_url}/{project_id}/connections/", + new_connection_data, + ) return response["data"]["id"] - def create_job(self, project_id, environment_id, schedule_cron, name, vars): + def create_job( + self, project_id: int, environment_id: int, schedule_cron: str, name: str, vars: str + ) -> int: """ Creates sample job for given project and environment. Job is triggered by the scheduler executes commands: dbt seed, dbt test and dbt run. @@ -278,37 +295,25 @@ def create_job(self, project_id, environment_id, schedule_cron, name, vars): "environment_id": environment_id, "name": name, "dbt_version": None, - "triggers": { - "schedule": True, - "github_webhook": False - }, + "triggers": {"schedule": True, "github_webhook": False}, "execute_steps": [ "dbt seed --vars '" + vars + "'", "dbt run --vars '" + vars + "'", - "dbt test --vars '" + vars + "'" + "dbt test --vars '" + vars + "'", ], - "settings": { - "threads": 1, - "target_name": "default" - }, - "execution": { - "timeout_seconds": 600 - }, + "settings": {"threads": 1, "target_name": "default"}, + "execution": {"timeout_seconds": 600}, "state": 1, "schedule": { "cron": schedule_cron, - "date": { - "type": "every_day" - }, - "time": { - "type": "every_hour", - "interval": 1 - } - } + "date": {"type": "every_day"}, + "time": {"type": "every_hour", "interval": 1}, + }, } job_details_data = json.dumps(job_details).encode() - response = self.request(f"{self.host_url}/v2/accounts/{self.account_id}/jobs/", - job_details_data) + response = self.request( + f"{self.host_url}/v2/accounts/{self.account_id}/jobs/", job_details_data + ) return response["data"]["id"]