diff --git a/data_pipelines_cli/cli.py b/data_pipelines_cli/cli.py index c6845e9..799b03d 100644 --- a/data_pipelines_cli/cli.py +++ b/data_pipelines_cli/cli.py @@ -5,6 +5,7 @@ from .cli_commands.clean import clean_command from .cli_commands.compile import compile_project_command from .cli_commands.create import create_command +from .cli_commands.dbtcloud import configure_cloud_command from .cli_commands.deploy import deploy_command from .cli_commands.docs import docs_command from .cli_commands.generate.generate import generate_group @@ -50,3 +51,4 @@ def cli() -> None: _cli.add_command(seed_command) _cli.add_command(test_command) _cli.add_command(update_command) +_cli.add_command(configure_cloud_command) diff --git a/data_pipelines_cli/cli_commands/dbtcloud.py b/data_pipelines_cli/cli_commands/dbtcloud.py new file mode 100644 index 0000000..92f3a60 --- /dev/null +++ b/data_pipelines_cli/cli_commands/dbtcloud.py @@ -0,0 +1,158 @@ +import json +from typing import Any, Dict + +import click + +from data_pipelines_cli.dbt_cloud_api_client import DbtCloudApiClient + +from ..cli_constants import BUILD_DIR +from ..cli_utils import echo_info +from ..config_generation import read_dictionary_from_config_directory +from ..dbt_utils import _dump_dbt_vars_from_configs_to_string + + +def read_dbtcloud_config() -> Dict[str, Any]: + """ + Read dbt Cloud configuration. + + :param env: Name of the environment + :type env: str + :return: Compiled dictionary + :rtype: Dict[str, Any] + """ + return read_dictionary_from_config_directory(BUILD_DIR.joinpath("dag"), ".", "dbtcloud.yml") + + +def read_bigquery_config(env: str) -> Dict[str, Any]: + """ + Read dbt Cloud configuration. + + :param env: Name of the environment + :type env: str + :return: Compiled dictionary + :rtype: Dict[str, Any] + """ + return read_dictionary_from_config_directory(BUILD_DIR.joinpath("dag"), env, "bigquery.yml") + + +@click.command(name="configure-cloud", help="Create dbt Cloud project") +@click.option( + "--account_id", + type=int, + required=True, + help="""dbt Cloud Account identifier To obtain your dbt Cloud account ID, sign into dbt Cloud + in your browser. Take note of the number directly following the accounts path component of the + URL - this is your account ID""", +) +@click.option( + "--token", + type=str, + required=True, + help="""API token for your DBT Cloud account. You can retrieve your User API token from your + User Profile (top right icon) > API Access. You can also use Service Token. Retrieve it from + Account Settings > Service Tokens > Create Service Token.""", +) +@click.option( + "--remote_url", + type=str, + required=True, + help="""Git stores remote URL Note: After creating a dbt Cloud repository's SSH key, you will + need to add the generated key text as a deploy key to the target repository. This gives dbt + Cloud permissions to read / write in the repository.""", +) +@click.option("--keyfile", type=str, required=True, help="Bigquery keyfile") +def configure_cloud_command(account_id: int, token: str, remote_url: str, keyfile: str) -> None: + client = DbtCloudApiClient("https://cloud.getdbt.com/api", account_id, token) + + dbtcloud_config = read_dbtcloud_config() + base_bq_config = read_bigquery_config("base") + file = open(keyfile) + keyfile_data = json.load(file) + dbtcloud_project_id = client.create_project(dbtcloud_config["project_name"]) + (repository_id, deploy_key) = client.create_repository(dbtcloud_project_id, remote_url) + echo_info( + "You need to add the generated key text as a deploy key to the target repository.\n" + "This gives dbt Cloud permissions to read / write in the repository\n" + f"{deploy_key}" + ) + + environments_projects = {} + for environment in dbtcloud_config["environments"]: + bq_config = read_bigquery_config(environment["config_dir"]) + environments_projects[environment["name"]] = bq_config["project"] + environment_id = create_environment( + client, environment, bq_config["dataset"], dbtcloud_project_id + ) + if environment["type"] == "deployment": + dbt_vars = _dump_dbt_vars_from_configs_to_string(environment["config_dir"]).strip() + client.create_job( + dbtcloud_project_id, + environment_id, + environment["schedule_interval"], + "Job - " + environment["name"], + dbt_vars, + ) + + client.create_environment_variable( + dbtcloud_project_id, base_bq_config["project"], environments_projects + ) + + connection_id = create_bq_connection(client, keyfile_data, dbtcloud_project_id) + + client.associate_connection_repository( + dbtcloud_config["project_name"], dbtcloud_project_id, connection_id, repository_id + ) + + +def create_bq_connection( + client: DbtCloudApiClient, keyfile_data: Dict[str, str], project_id: int +) -> int: + """ + Creates a connection to the bigquery warehouse in the dbt Cloud project. + + :param client: API Client + :param keyfile_data: Data read from Bigquery keyfile + :param project_id: ID of the project in which the connection is to be created + :return: ID of the created connection + """ + return client.create_bigquery_connection( + project_id=project_id, + name="BQ Connection Name", + is_active=True, + gcp_project_id='{{ env_var("DBT_GCP_PROJECT") }}', + timeout_seconds=100, + private_key_id=keyfile_data["private_key_id"], + private_key=keyfile_data["private_key"], + client_email=keyfile_data["client_email"], + client_id=keyfile_data["client_id"], + auth_uri=keyfile_data["auth_uri"], + token_uri=keyfile_data["token_uri"], + auth_provider_x509_cert_url=keyfile_data["auth_provider_x509_cert_url"], + client_x509_cert_url=keyfile_data["client_x509_cert_url"], + ) + + +def create_environment( + client: DbtCloudApiClient, environment: Dict[str, str], dataset: str, project_id: int +) -> int: + """ + Creates a dbt Cloud environment with the specified configuration + + :param client: API Client + :param environment: Config of environment to be created + :param project_id: ID of the project + :param dataset: Name of target dataset + :return: ID of created environment + """ + if environment["type"] == "deployment": + credentials_id = client.create_credentials(dataset, project_id) + else: + credentials_id = None + environment_id = client.create_environment( + project_id, + environment["type"], + environment["name"], + environment["dbt_version"], + credentials_id, + ) + return environment_id diff --git a/data_pipelines_cli/dbt_cloud_api_client.py b/data_pipelines_cli/dbt_cloud_api_client.py new file mode 100644 index 0000000..4354240 --- /dev/null +++ b/data_pipelines_cli/dbt_cloud_api_client.py @@ -0,0 +1,319 @@ +import json +from typing import Any, Dict, Optional, Tuple + +import requests + + +class DbtCloudApiClient: + """A class used to create dbt Cloud project using API v3""" + + def __init__(self, host_url: str, account_id: int, token: str) -> None: + self.host_url = host_url + """Base URL differs for Multi and Single-Tenant Deployments""" + + self.account_id = account_id + """ + To find your user ID in dbt Cloud, read the following steps: + 1. Go to Account Settings, Team, and then Users, + 2. Select your user, + 3. In the address bar, the number after /users is your user ID. + """ + + self.api_v3_url = f"{self.host_url}/v3/accounts/{str(self.account_id)}/projects" + """Url of API v3 (used for managing resources in dbt Cloud)""" + + self.token = token + """You can find your User API token in the Profile page under the API Access label""" + + def request(self, url: str, data: Any) -> Any: + headers = { + "Accept": "application/json", + "Content-Type": "application/json", + "Authorization": f"Token {self.token}", + } + response = requests.post(url=url, data=data, headers=headers) + res = json.loads(response.content) + if not res["status"]["is_success"]: + raise Exception(res["status"]["user_message"] + "\n" + res["data"]) + return res + + def create_project(self, name: str) -> int: + """ + Note: the dbt_project_subdirectory is an optional field which allows you to point + dbt Cloud to a subdirectory that lives within the root folder of your target repository. + This means dbt Cloud will look for a dbt_project.yml file at that location. + + :param name: Name of the project + :return: ID of created project + """ + new_project = { + "id": None, + "account_id": self.account_id, + "name": name, + "dbt_project_subdirectory": None, + "connection_id": None, + "repository_id": None, + } + + new_project_data = json.dumps(new_project) + + response = self.request(f"{self.api_v3_url}/", new_project_data) + return response["data"]["id"] + + def create_repository(self, project_id: int, git_clone_url: str) -> Tuple[int, str]: + """ + Note: After creating a dbt Cloud repository's SSH key, you will need to add the generated + key text as a deploy key to the target repository. This gives dbt Cloud permissions to + read / write in the repository. + + :param git_clone_url: Repository remote url + :param project_id: ID of the project + :return: repository ID and deploy key + """ + new_repository = { + "account_id": self.account_id, + "project_id": project_id, + "remote_url": git_clone_url, + "git_clone_strategy": "deploy_key", + "github_installation_id": None, + "token_str": None, + } + + new_repository_data = json.dumps(new_repository) + + response = self.request( + f"{self.api_v3_url}/{str(project_id)}/repositories/", + new_repository_data, + ) + return response["data"]["id"], response["data"]["deploy_key"]["public_key"] + + def create_environment( + self, + project_id: int, + env_type: str, + name: str, + dbt_version: str, + credentials_id: Optional[int] = None, + ) -> int: + """ + Create environment. Environments encompass a collection of settings for how you want to run + your dbt project. This includes: dbt version, git branch, data location (target schema). + + :param name: Name of the environment + :param env_type: type of environment (development/deployment) + :param project_id: ID of the project + :param credentials_id: ID of credentials to be used by environment + :param dbt_version: dbt version that should be used by this environment + :return: ID of created environment + """ + new_env = { + "id": None, + "type": env_type, + "name": name, + "account_id": self.account_id, + "project_id": project_id, + "state": 1, + "use_custom_branch": False, + "custom_branch": None, + "dbt_version": dbt_version, + "supports_docs": False, + "credentials_id": credentials_id, + } + + new_env_data = json.dumps(new_env) + + response = self.request( + f"{self.api_v3_url}/{str(project_id)}/environments/", + new_env_data, + ) + return response["data"]["id"] + + def create_environment_variable( + self, project_id: int, default: str, environments: Dict[str, str] + ) -> int: + """ + Create environment variable. Note: Environment variables must be prefixed with DBT_ or + DBT_ENV_SECRET_ . + + :param project_id: ID of the project + :param environments: dict which contains the value of the variable for each environment + :param default: default environment variable value for project + :return: IDs of created environment variable + """ + env_var = {"new_name": "DBT_GCP_PROJECT", "project": default} + env_var.update(environments) + new_env = {"env_var": env_var} + new_env_data = json.dumps(new_env) + + response = self.request( + f"{self.api_v3_url}/{str(project_id)}/environment-variables/bulk/", + new_env_data, + ) + return response["data"]["new_var_ids"] + + def associate_connection_repository( + self, + name: str, + project_id: int, + connection_id: Optional[int] = None, + repository_id: Optional[int] = None, + ) -> int: + """ + Link connection and repository to project + + :param name: Name of the project + :param project_id: ID of the project + :param connection_id: ID of the connection to be associated + :param repository_id: ID of the repository to be associated + :return: ID of the project + """ + new_connection = { + "name": name, + "account_id": self.account_id, + "id": project_id, + "connection_id": connection_id, + "repository_id": repository_id, + } + + new_connection_data = json.dumps(new_connection) + response = self.request( + f"{self.api_v3_url}/{str(project_id)}", + new_connection_data, + ) + + return response["data"]["id"] + + def create_credentials(self, schema: str, project_id: int) -> int: + """ + Creates credentials - these are needed to create the environment. + + :param schema: Default deployment dataset + :param project_id: ID of the project + :return: ID of created credentials + """ + new_credentials = { + "id": None, + "account_id": self.account_id, + "project_id": project_id, + "type": "bigquery", + "state": 1, + "threads": 4, + "schema": schema, + "target_name": "default", + "created_at": None, + "updated_at": None, + "username": None, + "has_refresh_token": False, + } + + new_credentials_data = json.dumps(new_credentials) + response = self.request( + f"{self.api_v3_url}/{str(project_id)}/credentials/", + new_credentials_data, + ) + + return response["data"]["id"] + + def create_bigquery_connection( + self, + project_id: int, + name: str, + is_active: bool, + gcp_project_id: str, + timeout_seconds: int, + private_key_id: str, + private_key: str, + client_email: str, + client_id: str, + auth_uri: str, + token_uri: str, + auth_provider_x509_cert_url: str, + client_x509_cert_url: str, + retries: int = 1, + location: Optional[str] = None, + maximum_bytes_billed: int = 0, + ) -> int: + """ + Creates dbtCloud connection to BigQuery + :param project_id: Name of the project + :param name: Name of the connection + :param is_active: should connection be active + :return: ID of the created connection + """ + connection_details = { + "project_id": gcp_project_id, + "timeout_seconds": timeout_seconds, + "private_key_id": private_key_id, + "private_key": private_key, + "client_email": client_email, + "client_id": client_id, + "auth_uri": auth_uri, + "token_uri": token_uri, + "auth_provider_x509_cert_url": auth_provider_x509_cert_url, + "client_x509_cert_url": client_x509_cert_url, + "retries": retries, + "location": location, + "maximum_bytes_billed": maximum_bytes_billed, + } + + new_connection = { + "id": None, + "account_id": self.account_id, + "project_id": project_id, + "name": name, + "type": "bigquery", + "state": 1 if is_active else 0, + "details": connection_details, + } + + new_connection_data = json.dumps(new_connection).encode() + response = self.request( + f"{self.api_v3_url}/{project_id}/connections/", + new_connection_data, + ) + + return response["data"]["id"] + + def create_job( + self, project_id: int, environment_id: int, schedule_cron: str, name: str, vars: str + ) -> int: + """ + Creates sample job for given project and environment. Job is triggered by the scheduler + executes commands: dbt seed, dbt test and dbt run. + :param project_id: ID of the project + :param environment_id: ID of the environment + :param schedule_cron: Schedule (cron syntax) + :param name: Name of the job + :param vars: Variables passed to commands + :return: ID of created job + """ + + job_details = { + "account_id": self.account_id, + "project_id": project_id, + "id": None, + "environment_id": environment_id, + "name": name, + "dbt_version": None, + "triggers": {"schedule": True, "github_webhook": False}, + "execute_steps": [ + "dbt seed --vars '" + vars + "'", + "dbt run --vars '" + vars + "'", + "dbt test --vars '" + vars + "'", + ], + "settings": {"threads": 1, "target_name": "default"}, + "execution": {"timeout_seconds": 600}, + "state": 1, + "schedule": { + "cron": schedule_cron, + "date": {"type": "every_day"}, + "time": {"type": "every_hour", "interval": 1}, + }, + } + + job_details_data = json.dumps(job_details).encode() + response = self.request( + f"{self.host_url}/v2/accounts/{self.account_id}/jobs/", job_details_data + ) + + return response["data"]["id"] diff --git a/docs/configuration.rst b/docs/configuration.rst index 4813689..647b2eb 100644 --- a/docs/configuration.rst +++ b/docs/configuration.rst @@ -375,4 +375,64 @@ Example ``looker.yml`` file might look like this: looker_project_id: my_looker_project looker_webhook_secret: "{{ env_var('LOOKER_WEBHOOK_SECRET') }}" looker_repository_branch: main - looker_instance_url: https://looker.company.com/ \ No newline at end of file + looker_instance_url: https://looker.company.com/ + +dbt Cloud configuration +++++++++++++++++++++++++++++++ + +``config/dbtcloud.yml`` contains configuration related to dbt Cloud: + +.. list-table:: + :widths: 25 20 55 + :header-rows: 1 + + * - Parameter + - Data type + - Description + * - project_name + - string + - Name of the project to be created in dbt Cloud + * - environments + - Array + - Details of the environments to be created in dbt Cloud + +Configuration of the environments: + +.. list-table:: + :widths: 25 20 55 + :header-rows: 1 + + * - Parameter + - Data type + - Description + * - name + - string + - Name of the environment that will be created in dbt Cloud + * - type + - string + - In dbt Cloud, there are two types of environments: deployment and development. Deployment environments determine the settings used when jobs created within that environment are executed. Development environments determine the settings used in the dbt Cloud IDE for that particular dbt Cloud Project. Each dbt Cloud project can only have a single development environment but can have any number of deployment environments. + * - dbt_version + - string + - The dbt version used in this environment + * - schedule_interval + - string + - The cron expression with which the example job will be run. This setting is only needed for the deployment environment. + * - config_dir + - string + - The name of the dp env directory where the bigquery configuration for the environment is located. The name of the project in GCP and target dataset will be read from it. + +Example ``dbtcloud.yml`` file might look like this: + +.. code-block:: yaml + + project_name: "Data Pipelines Project" + environments: + - name: "Develop" + dbt_version: "1.0.0" + type: "development" + config_dir: "dev" + - name: "Production" + dbt_version: "1.0.0" + type: "deployment" + config_dir: "prod" + schedule_interval: "0 12 * * *" diff --git a/docs/integration.rst b/docs/integration.rst index c961b31..0a77659 100644 --- a/docs/integration.rst +++ b/docs/integration.rst @@ -71,3 +71,13 @@ Looker **dp** can generate lookML codes for your models and views, publish and deploy your `Looker `_ project +dbt Cloud +++++++++++++++++++++++++++++++++++++++++++++++ + +The `Data Pipelines CLI` can configure a project in dbt Cloud. The following functions are supported: + +- creation of a project +- adding a repository +- adding a connection to BigQuery +- creation of environments +- creation of sample jobs