Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import functools
import typing
from typing import cast, Any
from typing import Any, cast

import bigframes_vendored.ibis.expr.api as ibis_api
import bigframes_vendored.ibis.expr.datatypes as ibis_dtypes
Expand Down
16 changes: 13 additions & 3 deletions packages/bigframes/bigframes/core/global_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import threading
import traceback
import warnings
from typing import TYPE_CHECKING, Callable, Optional, TypeVar
from typing import TYPE_CHECKING, Callable, Iterable, Optional, TypeVar

import google.auth.exceptions

Expand Down Expand Up @@ -124,12 +124,22 @@ def with_default_session(func_: Callable[..., _T], *args, **kwargs) -> _T:
return func_(get_global_session(), *args, **kwargs)


def execution_history() -> "bigframes.session._ExecutionHistory":
def execution_history(
*,
events: Optional[Iterable[bigframes.core.events.Event]] = None,
job_ids: Optional[Iterable[str]] = None,
filter_by_cell: bool = True,
) -> "bigframes.session._ExecutionHistory":
import pandas # noqa: F401

import bigframes.session

return with_default_session(bigframes.session.Session.execution_history)
return with_default_session(
bigframes.session.Session.execution_history,
events=events,
job_ids=job_ids,
filter_by_cell=filter_by_cell,
)


class _GlobalSessionContext:
Expand Down
6 changes: 6 additions & 0 deletions packages/bigframes/bigframes/pandas/io/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ def _try_read_gbq_colab_sessionless_dry_run(
def _read_gbq_colab( # type: ignore[overload-overlap]
query_or_table: str,
*,
callback: Optional[Callable[[bigframes.core.events.Event], None]] = ...,
pyformat_args: Optional[Dict[str, Any]] = ...,
dry_run: Literal[False] = ...,
) -> bigframes.dataframe.DataFrame: ...
Expand All @@ -309,6 +310,7 @@ def _read_gbq_colab( # type: ignore[overload-overlap]
def _read_gbq_colab(
query_or_table: str,
*,
callback: Optional[Callable[[bigframes.core.events.Event], None]] = ...,
pyformat_args: Optional[Dict[str, Any]] = ...,
dry_run: Literal[True] = ...,
) -> pandas.Series: ...
Expand All @@ -317,6 +319,7 @@ def _read_gbq_colab(
def _read_gbq_colab(
query_or_table: str,
*,
callback: Optional[Callable[[bigframes.core.events.Event], None]] = None,
pyformat_args: Optional[Dict[str, Any]] = None,
dry_run: bool = False,
) -> bigframes.dataframe.DataFrame | pandas.Series:
Expand All @@ -328,6 +331,8 @@ def _read_gbq_colab(
Args:
query_or_table (str):
SQL query or table ID (table ID not yet supported).
callback (Optional[Callable[[bigframes.core.events.Event], None]]):
Callback to receive query execution events.
pyformat_args (Optional[Dict[str, Any]]):
Parameters to format into the query string.
dry_run (bool):
Expand Down Expand Up @@ -379,6 +384,7 @@ def _read_gbq_colab(
return global_session.with_default_session(
bigframes.session.Session._read_gbq_colab,
query_or_table,
callback=callback,
pyformat_args=pyformat_args,
dry_run=dry_run,
)
Expand Down
119 changes: 106 additions & 13 deletions packages/bigframes/bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,18 @@
class _ExecutionHistory:
def __init__(self, jobs: list[dict]):
self._df = pandas.DataFrame(jobs)
if self._df.empty:
self._df = pandas.DataFrame(
columns=[
"job_id",
"query_id",
"job_type",
"status",
"query",
"total_bytes_processed",
"job_url",
]
)

def to_dataframe(self) -> pandas.DataFrame:
"""Returns the execution history as a pandas DataFrame."""
Expand Down Expand Up @@ -199,9 +211,10 @@ def __init__(
self._clients_provider = clients_provider
self._location = context.location or "US"
else:
credentials, project = (
bigframes._config.auth.resolve_credentials_and_project(context)
)
(
credentials,
project,
) = bigframes._config.auth.resolve_credentials_and_project(context)
if context.location is None:
with bigquery.Client(
project=project,
Expand Down Expand Up @@ -430,12 +443,82 @@ def slot_millis_sum(self):
"""The sum of all slot time used by bigquery jobs in this session."""
return self._metrics.slot_millis

def execution_history(self) -> _ExecutionHistory:
def execution_history(
self,
*,
events: Optional[Iterable[bigframes.core.events.Event]] = None,
job_ids: Optional[Iterable[str]] = None,
filter_by_cell: bool = True,
) -> _ExecutionHistory:
"""Returns the history of executions initiated by BigFrames in the current session.

Use `.to_dataframe()` on the result to get a pandas DataFrame.

Args:
events (Iterable[Event], optional):
Filter execution history to only include jobs associated with the given events.
job_ids (Iterable[str], optional):
Filter execution history to only include jobs matching the given job IDs.
filter_by_cell (bool, optional):
If True and running in Colab/Jupyter, automatically filter history to only include
jobs executed within the current cell. Defaults to True.
"""
return _ExecutionHistory([job.__dict__ for job in self._metrics.jobs])
jobs = [job.__dict__ for job in self._metrics.jobs]

if events is not None:
event_job_ids = {
getattr(event, "job_id", None)
for event in events
if getattr(event, "job_id", None) is not None
}
event_query_ids = {
getattr(event, "query_id", None)
for event in events
if getattr(event, "query_id", None) is not None
}
jobs = [
job
for job in jobs
if (
job.get("job_id") is not None and job.get("job_id") in event_job_ids
)
or (
job.get("query_id") is not None
and job.get("query_id") in event_query_ids
)
]

elif job_ids is not None:
target_job_ids = set(job_ids)
jobs = [
job
for job in jobs
if (
job.get("job_id") is not None
and job.get("job_id") in target_job_ids
)
or (
job.get("query_id") is not None
and job.get("query_id") in target_job_ids
)
]

elif filter_by_cell:
try:
import IPython

ipy = IPython.get_ipython()
if ipy is not None and hasattr(ipy, "execution_count"):
current_count = ipy.execution_count
jobs = [
job
for job in jobs
if job.get("cell_execution_count") == current_count
]
except (ImportError, NameError):
pass

return _ExecutionHistory(jobs)

@property
def _allows_ambiguity(self) -> bool:
Expand Down Expand Up @@ -584,6 +667,7 @@ def _read_gbq_colab(
self,
query: str,
*,
callback: Optional[Callable[[bigframes.core.events.Event], None]] = ...,
pyformat_args: Optional[Dict[str, Any]] = None,
dry_run: Literal[False] = ...,
) -> dataframe.DataFrame: ...
Expand All @@ -593,6 +677,7 @@ def _read_gbq_colab(
self,
query: str,
*,
callback: Optional[Callable[[bigframes.core.events.Event], None]] = ...,
pyformat_args: Optional[Dict[str, Any]] = None,
dry_run: Literal[True] = ...,
) -> pandas.Series: ...
Expand All @@ -601,8 +686,8 @@ def _read_gbq_colab(
def _read_gbq_colab(
self,
query: str,
# TODO: Add a callback parameter that takes some kind of Event object.
*,
callback: Optional[Callable[[bigframes.core.events.Event], None]] = None,
pyformat_args: Optional[Dict[str, Any]] = None,
dry_run: bool = False,
) -> Union[dataframe.DataFrame, pandas.Series]:
Expand All @@ -615,6 +700,8 @@ def _read_gbq_colab(
query (str):
A SQL query string to execute. Results (if any) are turned into
a DataFrame.
callback (Optional[Callable[[bigframes.core.events.Event], None]]):
Callback to receive query execution events.
pyformat_args (dict):
A dictionary of potential variables to replace in ``query``.
Note: strings are _not_ escaped. Use query parameters for these,
Expand All @@ -634,13 +721,19 @@ def _read_gbq_colab(
dry_run=dry_run,
)

return self._loader.read_gbq_query(
query=query,
index_col=bigframes.enums.DefaultIndexKind.NULL,
force_total_order=False,
dry_run=typing.cast(Union[Literal[False], Literal[True]], dry_run),
allow_large_results=allow_large_results,
)
def _run_query():
return self._loader.read_gbq_query(
query=query,
index_col=bigframes.enums.DefaultIndexKind.NULL,
force_total_order=False,
dry_run=typing.cast(Union[Literal[False], Literal[True]], dry_run),
allow_large_results=allow_large_results,
)

if callback is not None:
with self._publisher.subscribe(callback):
return _run_query()
return _run_query()

@overload
def read_gbq_query( # type: ignore[overload-overlap]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ def create_job_configs_labels(
) -> Dict[str, str]:
if job_configs_labels is None:
job_configs_labels = {}
else:
job_configs_labels = dict(job_configs_labels)

if api_methods and "bigframes-api" not in job_configs_labels:
job_configs_labels["bigframes-api"] = api_methods[0]
Expand Down
2 changes: 1 addition & 1 deletion packages/bigframes/bigframes/session/bigquery_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def close(self):
# Assume this is being called in the user thread, so we can access
# this thread-local config.
job_config=bigquery.QueryJobConfig(
labels=bigframes.options.compute.extra_query_labels
labels=dict(bigframes.options.compute.extra_query_labels)
),
location=self.location,
project=None,
Expand Down
34 changes: 34 additions & 0 deletions packages/bigframes/bigframes/session/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class JobMetadata:
input_bytes: Optional[int] = None
output_rows: Optional[int] = None
source_format: Optional[str] = None
cell_execution_count: Optional[int] = None

@classmethod
def from_job(
Expand All @@ -65,6 +66,16 @@ def from_job(
if job_id:
job_url = f"https://console.cloud.google.com/bigquery?project={query_job.project}&j=bq:{query_job.location}:{job_id}&page=queryresults"

cell_execution_count = None
try:
import IPython

ipy = IPython.get_ipython()
if ipy is not None and hasattr(ipy, "execution_count"):
cell_execution_count = ipy.execution_count
except (ImportError, NameError):
pass

metadata = cls(
job_id=query_job.job_id,
location=query_job.location,
Expand All @@ -78,6 +89,7 @@ def from_job(
error_result=query_job.error_result,
query=query_text,
job_url=job_url,
cell_execution_count=cell_execution_count,
)
if isinstance(query_job, QueryJob):
metadata.cached = getattr(query_job, "cache_hit", None)
Expand Down Expand Up @@ -121,6 +133,16 @@ def from_row_iterator(
location = getattr(row_iterator, "location", "")
job_url = f"https://console.cloud.google.com/bigquery?project={project}&j=bq:{location}:{job_id}&page=queryresults"

cell_execution_count = None
try:
import IPython

ipy = IPython.get_ipython()
if ipy is not None and hasattr(ipy, "execution_count"):
cell_execution_count = ipy.execution_count
except (ImportError, NameError):
pass

return cls(
job_id=job_id,
query_id=getattr(row_iterator, "query_id", None),
Expand All @@ -137,6 +159,7 @@ def from_row_iterator(
cached=getattr(row_iterator, "cache_hit", None),
query=query_text,
job_url=job_url,
cell_execution_count=cell_execution_count,
)


Expand Down Expand Up @@ -249,10 +272,21 @@ def on_event(self, event: Any):
bytes_processed = event.result.total_bytes_processed or 0
self.bytes_processed += bytes_processed

cell_execution_count = None
try:
import IPython

ipy = IPython.get_ipython()
if ipy is not None and hasattr(ipy, "execution_count"):
cell_execution_count = ipy.execution_count
except (ImportError, NameError):
pass

metadata = JobMetadata(
job_type="polars",
status="DONE",
total_bytes_processed=bytes_processed,
cell_execution_count=cell_execution_count,
)
self.jobs.append(metadata)

Expand Down
Loading
Loading