|
7 | 7 | ------------------- |
8 | 8 | ConstantColumnsFunction - Demonstrates varargs with dynamic output schema |
9 | 9 | DoubleSequenceFunction - Generates a sequence of floats 0.0..n-1 |
| 10 | +FilterEchoFunction - Echoes pushed-down filter predicates in output |
10 | 11 | GeneratorExceptionFunction - Demonstrates exception handling |
11 | 12 | LoggingGeneratorFunction - Demonstrates log message emission |
12 | 13 | NamedParamsEchoFunction - Echoes named parameter values in output columns |
|
35 | 36 | from vgi.invocation import BindResponse, GlobalInitResponse |
36 | 37 | from vgi.metadata import FunctionExample |
37 | 38 | from vgi.schema_utils import schema |
| 39 | +from vgi.table_filter_pushdown import PushdownFilters |
38 | 40 | from vgi.table_function import ( |
39 | 41 | BindParams, |
40 | 42 | InitParams, |
|
48 | 50 | __all__ = [ |
49 | 51 | "ConstantColumnsFunction", |
50 | 52 | "DoubleSequenceFunction", |
| 53 | + "FilterEchoFunction", |
51 | 54 | "GeneratorExceptionFunction", |
52 | 55 | "LoggingGeneratorFunction", |
53 | 56 | "NamedParamsEchoFunction", |
@@ -1440,3 +1443,131 @@ def process( |
1440 | 1443 | ) |
1441 | 1444 | out.emit(batch) |
1442 | 1445 | out.finish() |
| 1446 | + |
| 1447 | + |
| 1448 | +# ============================================================================= |
| 1449 | +# FilterEchoFunction — diagnostic: echoes pushed-down filter predicates |
| 1450 | +# ============================================================================= |
| 1451 | + |
| 1452 | + |
| 1453 | +def _format_pushed_filters(filters: PushdownFilters | None) -> str: |
| 1454 | + """Format pushed-down filters as a human-readable SQL-like string.""" |
| 1455 | + if not filters: |
| 1456 | + return "(none)" |
| 1457 | + sql, params = filters.to_sql(quote_identifier=lambda s: s) |
| 1458 | + if not sql: |
| 1459 | + return "(none)" |
| 1460 | + # Replace ?-placeholders positionally to avoid issues if param values contain "?" |
| 1461 | + parts: list[str] = [] |
| 1462 | + param_iter = iter(params) |
| 1463 | + for chunk in sql.split("?"): |
| 1464 | + parts.append(chunk) |
| 1465 | + try: |
| 1466 | + p = next(param_iter) |
| 1467 | + parts.append(repr(p) if isinstance(p, str) else str(p)) |
| 1468 | + except StopIteration: |
| 1469 | + pass |
| 1470 | + return "".join(parts) |
| 1471 | + |
| 1472 | + |
| 1473 | +@dataclass(slots=True, frozen=True) |
| 1474 | +class FilterEchoFunctionArgs: |
| 1475 | + """Arguments for FilterEchoFunction.""" |
| 1476 | + |
| 1477 | + count: Annotated[int, Arg(0, doc="Number of rows to generate", ge=0, default=10)] |
| 1478 | + batch_size: Annotated[int, Arg("batch_size", default=2048, doc="Batch size for output", ge=1)] |
| 1479 | + |
| 1480 | + |
| 1481 | +@dataclass(kw_only=True) |
| 1482 | +class FilterEchoState(ArrowSerializableDataclass): |
| 1483 | + """Mutable state tracking remaining rows, position, and cached filter string.""" |
| 1484 | + |
| 1485 | + remaining: int |
| 1486 | + current_index: int = 0 |
| 1487 | + filter_str: Annotated[str, Transient()] = "(none)" |
| 1488 | + |
| 1489 | + |
| 1490 | +@init_single_worker |
| 1491 | +@bind_fixed_schema |
| 1492 | +@_cardinality_from_count |
| 1493 | +class FilterEchoFunction(TableFunctionGenerator[FilterEchoFunctionArgs, FilterEchoState]): |
| 1494 | + """Echoes pushed-down filter predicates in output for diagnostic purposes. |
| 1495 | +
|
| 1496 | + USE CASE |
| 1497 | + -------- |
| 1498 | + Verify which filters DuckDB pushes down to the VGI worker. The |
| 1499 | + ``pushed_filters`` column shows the SQL-like representation of all |
| 1500 | + filters the engine sent. Filters are auto-applied by the worker so |
| 1501 | + the result set is always correct. |
| 1502 | +
|
| 1503 | + SCHEMA |
| 1504 | + ------ |
| 1505 | + Output: {"n": int64, "s": string, "pushed_filters": string} |
| 1506 | +
|
| 1507 | + Example: |
| 1508 | + ------- |
| 1509 | + SELECT * FROM filter_echo(10) WHERE n >= 8 |
| 1510 | + Returns: rows 8-9 with pushed_filters showing "n >= 8" |
| 1511 | +
|
| 1512 | + """ |
| 1513 | + |
| 1514 | + class Meta: |
| 1515 | + """Metadata for FilterEchoFunction.""" |
| 1516 | + |
| 1517 | + name = "filter_echo" |
| 1518 | + description = "Echoes pushed-down filter predicates in output" |
| 1519 | + categories = ["generator", "diagnostic"] |
| 1520 | + filter_pushdown = True |
| 1521 | + auto_apply_filters = True |
| 1522 | + projection_pushdown = True |
| 1523 | + examples = [ |
| 1524 | + FunctionExample( |
| 1525 | + sql="SELECT * FROM filter_echo(10)", |
| 1526 | + description="Generate 10 rows showing pushed filters", |
| 1527 | + ), |
| 1528 | + FunctionExample( |
| 1529 | + sql="SELECT pushed_filters FROM filter_echo(10) WHERE n >= 8", |
| 1530 | + description="See which filters were pushed down", |
| 1531 | + ), |
| 1532 | + ] |
| 1533 | + |
| 1534 | + FIXED_SCHEMA: ClassVar[pa.Schema] = schema({"n": pa.int64(), "s": pa.utf8(), "pushed_filters": pa.utf8()}) |
| 1535 | + |
| 1536 | + @classmethod |
| 1537 | + def initial_state(cls, params: ProcessParams[FilterEchoFunctionArgs]) -> FilterEchoState: |
| 1538 | + """Create initial state with remaining count and cached filter string.""" |
| 1539 | + pf = params.init_call.pushdown_filters |
| 1540 | + filters = cls.pushdown_filters(pf) if pf is not None else None |
| 1541 | + return FilterEchoState( |
| 1542 | + remaining=params.args.count, |
| 1543 | + filter_str=_format_pushed_filters(filters), |
| 1544 | + ) |
| 1545 | + |
| 1546 | + @classmethod |
| 1547 | + def process( |
| 1548 | + cls, |
| 1549 | + params: ProcessParams[FilterEchoFunctionArgs], |
| 1550 | + state: FilterEchoState, |
| 1551 | + out: OutputCollector, |
| 1552 | + ) -> None: |
| 1553 | + """Generate rows with n, s, and pushed_filters columns.""" |
| 1554 | + if state.remaining <= 0: |
| 1555 | + out.finish() |
| 1556 | + return |
| 1557 | + |
| 1558 | + size = min(state.remaining, params.args.batch_size) |
| 1559 | + start = state.current_index |
| 1560 | + |
| 1561 | + n_values = list(range(start, start + size)) |
| 1562 | + s_values = [f"row_{i}" for i in n_values] |
| 1563 | + filter_values = [state.filter_str] * size |
| 1564 | + |
| 1565 | + out.emit( |
| 1566 | + pa.RecordBatch.from_pydict( |
| 1567 | + {"n": n_values, "s": s_values, "pushed_filters": filter_values}, |
| 1568 | + schema=params.output_schema, |
| 1569 | + ) |
| 1570 | + ) |
| 1571 | + |
| 1572 | + state.current_index += size |
| 1573 | + state.remaining -= size |
0 commit comments