Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ jobs:
run: pip install uv

- name: Install package
run: uv sync --locked --all-extras --group ci --verbose
run: |
uv sync --all-extras --group ci --group dev --verbose

# BigQuery start
# - id: 'auth'
Expand All @@ -50,17 +51,22 @@ jobs:
# - name: 'Use gcloud CLI'
# run: "gcloud config configurations list"

- name: 'Create datadiff SA'
run: "echo '${{ secrets.BQ_SA }}' > bq_sa.json"

# BigQuery end

- name: Run unit tests
env:
DATADIFF_SNOWFLAKE_URI: '${{ secrets.DATADIFF_SNOWFLAKE_URI }}'
DATADIFF_PRESTO_URI: '${{ secrets.DATADIFF_PRESTO_URI }}'
DATADIFF_TRINO_URI: '${{ secrets.DATADIFF_TRINO_URI }}'
# DATADIFF_BIGQUERY_URI: '${{ secrets.DATADIFF_BIGQUERY_URI }}'
GOOGLE_APPLICATION_CREDENTIALS: 'bq_sa.json'
DATADIFF_BIGQUERY_URI: '${{ secrets.DATADIFF_BIGQUERY_URI }}'
DATADIFF_CLICKHOUSE_URI: 'clickhouse://clickhouse:Password1@localhost:9000/clickhouse'
DATADIFF_REDSHIFT_URI: '${{ secrets.DATADIFF_REDSHIFT_URI }}'
MOTHERDUCK_TOKEN: '${{ secrets.MOTHERDUCK_TOKEN }}'
run: |
chmod +x tests/waiting_for_stack_up.sh
./tests/waiting_for_stack_up.sh && TEST_ACROSS_ALL_DBS=0 uv run unittest-parallel -j 16
./tests/waiting_for_stack_up.sh
TEST_ACROSS_ALL_DBS=0 uv run unittest-parallel -j 16
13 changes: 10 additions & 3 deletions data_diff/databases/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -959,7 +959,12 @@ def query(self, sql_ast: Union[Expr, Generator], res_type: type = None, log_mess
It's a cleaner approach than exposing cursors, but may not be enough in all cases.
"""

sql_code: Union[str, ThreadLocalInterpreter]

compiler = Compiler(self)

if self.is_closed:
raise ConnectError("This database connection is closed.")
if isinstance(sql_ast, Generator):
sql_code = ThreadLocalInterpreter(compiler, sql_ast)
elif isinstance(sql_ast, list):
Expand All @@ -973,8 +978,9 @@ def query(self, sql_ast: Union[Expr, Generator], res_type: type = None, log_mess
if res_type is None:
res_type = sql_ast.type
sql_code = self.compile(sql_ast)
if sql_code is SKIP:
return SKIP

if sql_code is SKIP or sql_code == "":
return QueryResult([]) # Return empty QueryResult if no-op

if log_message:
logger.debug("Running SQL (%s): %s \n%s", self.name, log_message, sql_code)
Expand All @@ -987,13 +993,14 @@ def query(self, sql_ast: Union[Expr, Generator], res_type: type = None, log_mess
for row in explain:
# Most returned a 1-tuple. Presto returns a string
if isinstance(row, tuple):
(row,) = row
(row) = row
logger.debug("EXPLAIN: %s", row)
answer = input("Continue? [y/n] ")
if answer.lower() not in ["y", "yes"]:
sys.exit(1)

res = self._query(sql_code)

if res_type is list:
return list(res)
elif res_type is int:
Expand Down
17 changes: 15 additions & 2 deletions data_diff/databases/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
UnknownColType,
Time,
Date,
TimestampTZ, # Added this import
)
from data_diff.databases.base import (
BaseDialect,
Expand Down Expand Up @@ -96,11 +97,20 @@ def to_string(self, s: str) -> str:
return f"cast({s} as string)"

def type_repr(self, t) -> str:
if isinstance(t, Timestamp) or isinstance(t, TimestampTZ): # BigQuery's TIMESTAMP type, does not accept precision
return "TIMESTAMP"
if isinstance(t, Datetime): # BigQuery's DATETIME type, does not accept precision
return "DATETIME"
if isinstance(t, Date):
return "DATE"
if isinstance(t, Time):
return "TIME"
try:
return {str: "STRING", float: "FLOAT64"}[t]
except KeyError:
return super().type_repr(t)


def parse_type(self, table_path: DbPath, info: RawColumnInfo) -> ColType:
col_type = super().parse_type(table_path, info)
if not isinstance(col_type, UnknownColType):
Expand Down Expand Up @@ -151,7 +161,7 @@ def to_comparable(self, value: str, coltype: ColType) -> str:
return super().to_comparable(value, coltype)

def set_timezone_to_utc(self) -> str:
raise NotImplementedError()
return ""

def parse_table_name(self, name: str) -> DbPath:
path = parse_table_name(name)
Expand Down Expand Up @@ -252,12 +262,15 @@ def __init__(self, project, *, dataset, bigquery_credentials=None, **kw) -> None
target_scopes=["https://www.googleapis.com/auth/cloud-platform"],
)

self._client = bigquery.Client(project=project, credentials=credentials, **kw)
default_config = bigquery.QueryJobConfig(default_dataset=f"{project}.{dataset}")

self._client = bigquery.Client(project=project, credentials=credentials, default_query_job_config=default_config, **kw)
Comment on lines +265 to +267
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Passing default_query_job_config as an explicit keyword argument while it might also be present in **kw will cause a TypeError (multiple values for argument) if a user provides it in the connection parameters. It is safer to pop it from kw and then configure it.

Suggested change
default_config = bigquery.QueryJobConfig(default_dataset=f"{project}.{dataset}")
self._client = bigquery.Client(project=project, credentials=credentials, default_query_job_config=default_config, **kw)
default_config = kw.pop("default_query_job_config", bigquery.QueryJobConfig())
default_config.default_dataset = f"{project}.{dataset}"
self._client = bigquery.Client(project=project, credentials=credentials, default_query_job_config=default_config, **kw)

self.project = project
self.dataset = dataset

self.default_schema = dataset


def _normalize_returned_value(self, value):
if isinstance(value, bytes):
return value.decode()
Expand Down
2 changes: 1 addition & 1 deletion data_diff/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.11.1"
__version__ = "0.12.1"
3 changes: 3 additions & 0 deletions dev/dev.env
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,6 @@ CLICKHOUSE_USER=clickhouse
CLICKHOUSE_PASSWORD=Password1
CLICKHOUSE_DB=clickhouse
CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT=1

GOOGLE_APPLICATION_CREDENTIALS=bq.sa
DATADIFF_BIGQUERY_URI=bigquery://dms-analytics-v2-data-diff/test
Comment on lines +15 to +16
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Hardcoding specific project IDs (dms-analytics-v2-data-diff) and local file paths for credentials (bq.sa) in a shared environment file reduces portability and can lead to configuration issues for other contributors. It is recommended to use placeholders or provide a .env.example file instead.

1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ services:
container_name: dd-clickhouse
image: clickhouse/clickhouse-server:21.12.3.32
restart: always
user: "101:101"
volumes:
- clickhouse-data:/var/lib/clickhouse:delegated
ulimits:
Expand Down
7 changes: 5 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "data-diff"
version = "0.12.0"
version = "0.12.1"
description = "Command-line tool and Python library to efficiently diff rows across two different databases."
readme = "README.md"
requires-python = ">=3.9,<4.0"
Expand All @@ -23,12 +23,13 @@ classifiers = [
"Typing :: Typed",
]
dependencies = [
"setuptools>=69,<71",
"pydantic<2.0",
"dsnparse<0.2.0",
"click==8.1.7",
"rich",
"toml>=0.10.2",
"dbt-core>=1.0.0,<2.0.0",
"dbt-core>=1.7.0,<2.0.0",
"keyring",
"tabulate==0.9.0",
"urllib3<2",
Expand All @@ -50,6 +51,7 @@ trino = ["trino>=0.314.0"]
clickhouse = ["clickhouse-driver"]
vertica = ["vertica-python"]
duckdb = ["duckdb"]
bigquery = ["google-cloud-bigquery"]
all-dbs = [
"preql>=0.2.19",
"mysql-connector-python==8.0.29",
Expand All @@ -63,6 +65,7 @@ all-dbs = [
"clickhouse-driver",
"vertica-python",
"duckdb",
"google-cloud-bigquery",
]

[project.scripts]
Expand Down
6 changes: 5 additions & 1 deletion tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ def run_datadiff_cli(*args):
logging.error(e.stderr)
raise
if stderr:
raise Exception(stderr)
stderr_str = stderr.decode()
if "Traceback" in stderr_str or "Error" in stderr_str or "Exception" in stderr_str:
# Ignore FutureWarning
if "FutureWarning" not in stderr_str:
raise Exception(stderr)
return stdout.splitlines()


Expand Down
Loading
Loading