From 5b68398715826ed9ffabe67ca7adf2370a667e4f Mon Sep 17 00:00:00 2001 From: Alexander Belikov Date: Tue, 31 Mar 2026 23:55:12 +0200 Subject: [PATCH 1/4] better resource - connector link --- graflo/__init__.py | 4 +- graflo/architecture/__init__.py | 4 +- graflo/architecture/contract/__init__.py | 4 +- .../contract/bindings/__init__.py | 4 +- .../contract/bindings/connectors.py | 46 ++--- graflo/architecture/contract/bindings/core.py | 110 +++++------ graflo/db/tigergraph/conn.py | 19 +- graflo/hq/auto_join.py | 17 +- graflo/hq/caster.py | 4 +- graflo/hq/registry_builder.py | 177 +++++++++--------- graflo/util/onto.py | 4 +- .../architecture/test_connector_conn_proxy.py | 21 ++- .../test_manifest_canonical_contract.py | 2 +- test/architecture/test_resource_filters.py | 28 ++- test/data_source/sparql/test_rdf_inference.py | 6 +- .../db/postgres/test_ingest_datetime_range.py | 10 +- test/test_filter_view.py | 2 +- test/test_patterns.py | 89 +++++---- 18 files changed, 298 insertions(+), 253 deletions(-) diff --git a/graflo/__init__.py b/graflo/__init__.py index 3c179bd1..52606bea 100644 --- a/graflo/__init__.py +++ b/graflo/__init__.py @@ -47,8 +47,8 @@ GraphModel, Index, IngestionModel, + BoundSourceKind, ResourceConnector, - ResourceType, Resource, SparqlConnector, Schema, @@ -135,8 +135,8 @@ "FileConnector", "Bindings", "JoinClause", + "BoundSourceKind", "ResourceConnector", - "ResourceType", "SparqlConnector", "TableConnector", ] diff --git a/graflo/architecture/__init__.py b/graflo/architecture/__init__.py index 01aa183a..02389c2f 100644 --- a/graflo/architecture/__init__.py +++ b/graflo/architecture/__init__.py @@ -17,8 +17,8 @@ JoinClause, ProtoTransform, Resource, + BoundSourceKind, ResourceConnector, - ResourceType, SparqlConnector, TableConnector, Transform, @@ -54,8 +54,8 @@ "JoinClause", "ProtoTransform", "Resource", + "BoundSourceKind", "ResourceConnector", - "ResourceType", "Schema", "SchemaDBAware", "SparqlConnector", diff --git a/graflo/architecture/contract/__init__.py b/graflo/architecture/contract/__init__.py index 0c517596..6525e6b0 100644 --- a/graflo/architecture/contract/__init__.py +++ b/graflo/architecture/contract/__init__.py @@ -2,10 +2,10 @@ from .bindings import ( Bindings, + BoundSourceKind, FileConnector, JoinClause, ResourceConnector, - ResourceType, SparqlConnector, TableConnector, ) @@ -19,6 +19,7 @@ __all__ = [ "Bindings", + "BoundSourceKind", "FileConnector", "GraphManifest", "IngestionModel", @@ -26,7 +27,6 @@ "ProtoTransform", "Resource", "ResourceConnector", - "ResourceType", "SparqlConnector", "TableConnector", "Transform", diff --git a/graflo/architecture/contract/bindings/__init__.py b/graflo/architecture/contract/bindings/__init__.py index a26b4e13..83ee0818 100644 --- a/graflo/architecture/contract/bindings/__init__.py +++ b/graflo/architecture/contract/bindings/__init__.py @@ -2,21 +2,21 @@ from .core import Bindings, ResourceConnectorBinding from .connectors import ( + BoundSourceKind, FileConnector, JoinClause, ResourceConnector, - ResourceType, SparqlConnector, TableConnector, ) __all__ = [ "Bindings", + "BoundSourceKind", "ResourceConnectorBinding", "FileConnector", "JoinClause", "ResourceConnector", - "ResourceType", "SparqlConnector", "TableConnector", ] diff --git a/graflo/architecture/contract/bindings/connectors.py b/graflo/architecture/contract/bindings/connectors.py index 07e646b7..c47d86b3 100644 --- a/graflo/architecture/contract/bindings/connectors.py +++ b/graflo/architecture/contract/bindings/connectors.py @@ -23,17 +23,17 @@ PostgresConfig = Any -class ResourceType(BaseEnum): - """Resource types for data sources. +class BoundSourceKind(BaseEnum): + """Physical source modality for a bound connector (how rows are retrieved). - Resource types distinguish between different data source categories. - File type detection (CSV, JSON, JSONL, Parquet, etc.) is handled - automatically by the loader based on file extensions. + This describes the connector-backed access pattern, not the abstract + ingestion resource. File format (CSV, JSON, etc.) is chosen by the loader + from file extensions. Attributes: - FILE: File-based data source (any format: CSV, JSON, JSONL, Parquet, etc.) - SQL_TABLE: SQL database table (e.g., PostgreSQL table) - SPARQL: SPARQL / RDF data source (endpoint or .ttl/.rdf files via rdflib) + FILE: File-based connector (directory + pattern or paths). + SQL_TABLE: SQL table / database-backed connector. + SPARQL: SPARQL / RDF connector (endpoint or local RDF via rdflib). """ FILE = "file" @@ -99,12 +99,8 @@ def matches(self, resource_identifier: str) -> bool: pass @abc.abstractmethod - def get_resource_type(self) -> ResourceType: - """Get the type of resource this connector matches. - - Returns: - ResourceType: Resource type enum value - """ + def bound_source_kind(self) -> BoundSourceKind: + """Return the physical source kind for this connector.""" pass @@ -153,14 +149,9 @@ def matches(self, resource_identifier: str) -> bool: return False return bool(re.match(self.regex, resource_identifier)) - def get_resource_type(self) -> ResourceType: - """Get resource type. - - FileConnector always represents a FILE resource type. - The specific file format (CSV, JSON, JSONL, Parquet, etc.) is - automatically detected by the loader based on file extensions. - """ - return ResourceType.FILE + def bound_source_kind(self) -> BoundSourceKind: + """File connector always uses ``BoundSourceKind.FILE``.""" + return BoundSourceKind.FILE class JoinClause(ConfigBaseModel): @@ -306,9 +297,8 @@ def matches(self, resource_identifier: str) -> bool: return False - def get_resource_type(self) -> ResourceType: - """Get resource type.""" - return ResourceType.SQL_TABLE + def bound_source_kind(self) -> BoundSourceKind: + return BoundSourceKind.SQL_TABLE def build_where_clause(self) -> str: """Build SQL WHERE clause from date filtering parameters **and** general filters. @@ -468,9 +458,9 @@ def matches(self, resource_identifier: str) -> bool: local_name = self.rdf_class.rsplit("#", 1)[-1].rsplit("/", 1)[-1] return resource_identifier == local_name - def get_resource_type(self) -> ResourceType: - """Return ``ResourceType.SPARQL``.""" - return ResourceType.SPARQL + def bound_source_kind(self) -> BoundSourceKind: + """Return ``BoundSourceKind.SPARQL``.""" + return BoundSourceKind.SPARQL def build_select_query(self) -> str: """Build a SPARQL SELECT query for instances of ``rdf_class``. diff --git a/graflo/architecture/contract/bindings/core.py b/graflo/architecture/contract/bindings/core.py index 104e9532..c6bcedb0 100644 --- a/graflo/architecture/contract/bindings/core.py +++ b/graflo/architecture/contract/bindings/core.py @@ -10,7 +10,6 @@ from .connectors import ( FileConnector, ResourceConnector, - ResourceType, SparqlConnector, TableConnector, ) @@ -59,7 +58,9 @@ class Bindings(ConfigBaseModel): ) _connectors_index: dict[str, ResourceConnector] = PrivateAttr(default_factory=dict) _connectors_name_index: dict[str, str] = PrivateAttr(default_factory=dict) - _resource_to_connector_hash: dict[str, str] = PrivateAttr(default_factory=dict) + _resource_to_connector_hashes: dict[str, list[str]] = PrivateAttr( + default_factory=dict + ) _connector_to_conn_proxy: dict[str, str] = PrivateAttr(default_factory=dict) @property @@ -91,6 +92,14 @@ def _rebuild_indexes(self) -> None: ) self._connectors_name_index[connector.name] = connector.hash + def _append_resource_connector_hash( + self, resource_name: str, connector_hash: str + ) -> None: + """Append *connector_hash* for *resource_name* if not already present (order kept).""" + bucket = self._resource_to_connector_hashes.setdefault(resource_name, []) + if connector_hash not in bucket: + bucket.append(connector_hash) + @field_validator("resource_connector", mode="before") @classmethod def _coerce_resource_connector_entries( @@ -189,7 +198,7 @@ def default_connector_name(connector: ResourceConnector) -> str: @model_validator(mode="after") def _populate_resource_connector(self) -> Self: self._rebuild_indexes() - self._resource_to_connector_hash = {} + self._resource_to_connector_hashes = {} # Create typed views so internal code never has to handle dicts. self._resource_connector_typed = [ @@ -204,45 +213,37 @@ def _populate_resource_connector(self) -> Self: for connector in self.connectors: if connector.resource_name is None: continue - existing_hash = self._resource_to_connector_hash.get( - connector.resource_name + self._append_resource_connector_hash( + connector.resource_name, connector.hash ) - if existing_hash is not None and existing_hash != connector.hash: - raise ValueError( - "Conflicting resource binding for resource " - f"'{connector.resource_name}'." - ) - self._resource_to_connector_hash[connector.resource_name] = connector.hash for mapping in self._resource_connector_typed: connector_hash = self._connectors_name_index.get(mapping.connector) if connector_hash is None: - raise ValueError( - f"resource_connector references unknown connector '{mapping.connector}' " - f"for resource '{mapping.resource}'." - ) - existing_hash = self._resource_to_connector_hash.get(mapping.resource) - if existing_hash is not None and existing_hash != connector_hash: - raise ValueError( - f"Conflicting resource binding for resource '{mapping.resource}'." - ) - self._resource_to_connector_hash[mapping.resource] = connector_hash + if mapping.connector in self._connectors_index: + connector_hash = mapping.connector + else: + raise ValueError( + f"resource_connector references unknown connector '{mapping.connector}' " + f"for resource '{mapping.resource}'." + ) + self._append_resource_connector_hash(mapping.resource, connector_hash) self._rebuild_connector_to_conn_proxy() return self def _resolve_connector_ref_to_hash(self, connector_ref: str) -> str: """Resolve a connector reference to its canonical connector hash. - The contract allows referencing either: + Allowed references: - ``connector.hash`` (canonical internal id), or - ``connector.name`` (when a name is provided / auto-filled). - - ``resource_name`` (alias when ``connector.name`` is omitted in manifests). + + Ingestion resource names are not valid connector references (a resource + may map to multiple connectors). """ if connector_ref in self._connectors_index: return connector_ref resolved_hash = self._connectors_name_index.get(connector_ref) - if resolved_hash is None: - resolved_hash = self._resource_to_connector_hash.get(connector_ref) if resolved_hash is None: raise ValueError(f"Unknown connector reference '{connector_ref}'") return resolved_hash @@ -358,15 +359,9 @@ def add_connector( self.connectors.append(connector) self._rebuild_indexes() if connector.resource_name is not None: - existing_hash = self._resource_to_connector_hash.get( - connector.resource_name + self._append_resource_connector_hash( + connector.resource_name, connector.hash ) - if existing_hash is not None and existing_hash != connector.hash: - raise ValueError( - "Conflicting resource binding for resource " - f"'{connector.resource_name}'." - ) - self._resource_to_connector_hash[connector.resource_name] = connector.hash def bind_resource( self, @@ -375,43 +370,24 @@ def bind_resource( ) -> None: if connector.hash not in self._connectors_index: raise KeyError(f"Connector not found for hash='{connector.hash}'") - self._resource_to_connector_hash[resource_name] = connector.hash + self._append_resource_connector_hash(resource_name, connector.hash) connector_name = connector.name or self.default_connector_name(connector) - mapping_idx = None - for idx, mapping in enumerate(self._resource_connector_typed): - if mapping.resource == resource_name: - mapping_idx = idx - break - new_mapping = ResourceConnectorBinding( - resource=resource_name, - connector=connector_name, + self._resource_connector_typed.append( + ResourceConnectorBinding( + resource=resource_name, + connector=connector_name, + ) ) - if mapping_idx is None: - self._resource_connector_typed.append(new_mapping) - else: - self._resource_connector_typed[mapping_idx] = new_mapping # Keep the public contract field in sync for serialization / downstream. self.resource_connector = list(self._resource_connector_typed) - def get_connector_for_resource( + def get_connectors_for_resource( self, resource_name: str - ) -> TableConnector | FileConnector | SparqlConnector | None: - connector_hash = self._resource_to_connector_hash.get(resource_name) - if connector_hash is None: - return None - connector = self._connectors_index.get(connector_hash) - if isinstance(connector, (TableConnector, FileConnector, SparqlConnector)): - return connector - return None - - def get_resource_type(self, resource_name: str) -> ResourceType | None: - connector = self.get_connector_for_resource(resource_name) - if connector is None: - return None - return connector.get_resource_type() - - def get_table_info(self, resource_name: str) -> tuple[str, str | None] | None: - connector = self.get_connector_for_resource(resource_name) - if isinstance(connector, TableConnector): - return (connector.table_name, connector.schema_name) - return None + ) -> list[TableConnector | FileConnector | SparqlConnector]: + """Return connectors bound to *resource_name*, in binding order (unique by hash).""" + result: list[TableConnector | FileConnector | SparqlConnector] = [] + for h in self._resource_to_connector_hashes.get(resource_name, []): + c = self._connectors_index.get(h) + if isinstance(c, (TableConnector, FileConnector, SparqlConnector)): + result.append(c) + return result diff --git a/graflo/db/tigergraph/conn.py b/graflo/db/tigergraph/conn.py index f0324c89..88bd53ca 100644 --- a/graflo/db/tigergraph/conn.py +++ b/graflo/db/tigergraph/conn.py @@ -3110,7 +3110,24 @@ def clear_data(self, schema: Schema) -> None: vertex_types = tuple(vc.vertex_dbname(v) for v in vc.vertex_set) if vertex_types: with self._ensure_graph_context(graph_name=graph_name): - self.delete_graph_structure(vertex_types=vertex_types) + for vertex_type in vertex_types: + try: + result = self._delete_vertices( + vertex_type=vertex_type, graph_name=graph_name + ) + logger.debug( + "Deleted vertices from %s in graph %s: %s", + vertex_type, + graph_name, + result, + ) + except Exception as e: + logger.error( + "Error deleting vertices from %s in graph %s: %s", + vertex_type, + graph_name, + e, + ) def _generate_upsert_payload( self, data: list[dict[str, Any]], vname: str, vindex: tuple[str, ...] diff --git a/graflo/hq/auto_join.py b/graflo/hq/auto_join.py index a35090a4..f0ac1034 100644 --- a/graflo/hq/auto_join.py +++ b/graflo/hq/auto_join.py @@ -144,10 +144,23 @@ def _vertex_table_info( """Return (table_name, schema_name, primary_key_field) for a vertex. Returns None if the vertex has no table connector in *bindings*. + Raises ValueError if more than one :class:`TableConnector` is bound to + the same *vertex_name* (auto-join requires a unique SQL source). """ - connector = bindings.get_connector_for_resource(vertex_name) - if not isinstance(connector, TableConnector): + table_connectors = [ + c + for c in bindings.get_connectors_for_resource(vertex_name) + if isinstance(c, TableConnector) + ] + if not table_connectors: return None + if len(table_connectors) > 1: + refs = ", ".join(c.name or c.hash for c in table_connectors) + raise ValueError( + f"Multiple TableConnectors bound to resource/vertex key '{vertex_name}' " + f"({refs}); disambiguate before using auto-join." + ) + connector = table_connectors[0] try: pk_fields = vertex_config.identity_fields(vertex_name) except (KeyError, IndexError): diff --git a/graflo/hq/caster.py b/graflo/hq/caster.py index ba68f0ff..d4970c3a 100644 --- a/graflo/hq/caster.py +++ b/graflo/hq/caster.py @@ -314,9 +314,7 @@ async def process_data_source( """ actual_resource_name = resource_name or data_source.resource_name - limit = getattr(data_source, "_pattern_limit", None) - if limit is None: - limit = self.ingestion_params.max_items + limit = self.ingestion_params.max_items for batch in data_source.iter_batches( batch_size=self.ingestion_params.batch_size, limit=limit diff --git a/graflo/hq/registry_builder.py b/graflo/hq/registry_builder.py index 924f558f..21ddf4d9 100644 --- a/graflo/hq/registry_builder.py +++ b/graflo/hq/registry_builder.py @@ -1,7 +1,7 @@ """Build a :class:`DataSourceRegistry` from :class:`Bindings` and schema models. Handles file discovery, SQL table source creation (with auto-JOIN -enrichment and datetime filtering), and connector dispatch by resource type. +enrichment and datetime filtering), and connector dispatch by bound source kind. """ from __future__ import annotations @@ -23,8 +23,8 @@ SparqlGeneralizedConnConfig, ) from graflo.architecture.contract.bindings import ( + BoundSourceKind, FileConnector, - ResourceType, SparqlConnector, TableConnector, ) @@ -61,8 +61,8 @@ def build( ) -> DataSourceRegistry: """Return a populated :class:`DataSourceRegistry`. - Iterates over every resource in the schema, looks up its connector and - resource type, then delegates to the appropriate registration helper. + For each ingestion resource, registers every bound connector (same + resource may have multiple physical sources). """ registry = DataSourceRegistry() provider = connection_provider or EmptyConnectionProvider() @@ -76,89 +76,100 @@ def build( resource_name = resource.name if resources_filter is not None and resource_name not in resources_filter: continue - resource_type = bindings.get_resource_type(resource_name) - - if resource_type is None: - msg = f"No resource type found for resource '{resource_name}'" - logger.warning("%s, skipping", msg) - failures.append(msg) - continue - - connector = bindings.get_connector_for_resource(resource_name) - if connector is None: - msg = f"No connector found for resource '{resource_name}'" + connectors = bindings.get_connectors_for_resource(resource_name) + if not connectors: + msg = f"No connectors bound for resource '{resource_name}'" logger.warning("%s, skipping", msg) failures.append(msg) continue - if resource_type == ResourceType.FILE: - if not isinstance(connector, FileConnector): - msg = f"Connector for resource '{resource_name}' is not a FileConnector" - logger.warning("%s, skipping", msg) - failures.append(msg) - continue - try: - self._register_file_sources( - registry, resource_name, connector, ingestion_params - ) - except Exception as e: - msg = ( - f"Failed to register FILE source for resource " - f"'{resource_name}': {e}" - ) - failures.append(msg) - if strict: + for connector in connectors: + cref = connector.name or connector.hash + kind = connector.bound_source_kind() + + if kind == BoundSourceKind.FILE: + if not isinstance(connector, FileConnector): + msg = ( + f"Connector '{cref}' for resource '{resource_name}' " + f"is not a FileConnector" + ) + logger.warning("%s, skipping", msg) + failures.append(msg) continue - - elif resource_type == ResourceType.SQL_TABLE: - if not isinstance(connector, TableConnector): - msg = f"Connector for resource '{resource_name}' is not a TableConnector" - logger.warning("%s, skipping", msg) - failures.append(msg) - continue - try: - self._register_sql_table_sources( - registry, - resource_name, - connector, - bindings, - ingestion_params, - provider, - ) - except Exception as e: - msg = f"Failed to register SQL source for resource '{resource_name}': {e}" - failures.append(msg) - if strict: + try: + self._register_file_sources( + registry, resource_name, connector, ingestion_params + ) + except Exception as e: + msg = ( + f"Failed to register FILE source for resource " + f"'{resource_name}' (connector '{cref}'): {e}" + ) + failures.append(msg) + if strict: + continue + + elif kind == BoundSourceKind.SQL_TABLE: + if not isinstance(connector, TableConnector): + msg = ( + f"Connector '{cref}' for resource '{resource_name}' " + f"is not a TableConnector" + ) + logger.warning("%s, skipping", msg) + failures.append(msg) continue + try: + self._register_sql_table_sources( + registry, + resource_name, + connector, + bindings, + ingestion_params, + provider, + ) + except Exception as e: + msg = ( + f"Failed to register SQL source for resource " + f"'{resource_name}' (connector '{cref}'): {e}" + ) + failures.append(msg) + if strict: + continue + + elif kind == BoundSourceKind.SPARQL: + if not isinstance(connector, SparqlConnector): + msg = ( + f"Connector '{cref}' for resource '{resource_name}' " + f"is not a SparqlConnector" + ) + logger.warning("%s, skipping", msg) + failures.append(msg) + continue + try: + self._register_sparql_sources( + registry, + resource_name, + connector, + bindings, + ingestion_params, + provider, + ) + except Exception as e: + msg = ( + f"Failed to register SPARQL source for resource " + f"'{resource_name}' (connector '{cref}'): {e}" + ) + failures.append(msg) + if strict: + continue - elif resource_type == ResourceType.SPARQL: - if not isinstance(connector, SparqlConnector): - msg = f"Connector for resource '{resource_name}' is not a SparqlConnector" - logger.warning("%s, skipping", msg) - failures.append(msg) - continue - try: - self._register_sparql_sources( - registry, - resource_name, - connector, - bindings, - ingestion_params, - provider, + else: + msg = ( + f"Unsupported bound source kind '{kind}' " + f"for resource '{resource_name}' (connector '{cref}')" ) - except Exception as e: - msg = f"Failed to register SPARQL source for resource '{resource_name}': {e}" + logger.warning("%s, skipping", msg) failures.append(msg) - if strict: - continue - - else: - msg = ( - f"Unsupported resource type '{resource_type}' " - f"for resource '{resource_name}'" - ) - logger.warning("%s, skipping", msg) - failures.append(msg) if strict and failures: details = "\n".join(f"- {item}" for item in failures) @@ -272,14 +283,8 @@ def _register_sql_table_sources( ) return - table_info = bindings.get_table_info(resource_name) - if table_info is None: - logger.warning( - f"Could not get table info for resource '{resource_name}', skipping" - ) - return - - table_name, schema_name = table_info + table_name = connector.table_name + schema_name = connector.schema_name effective_schema = schema_name or postgres_config.schema_name or "public" try: diff --git a/graflo/util/onto.py b/graflo/util/onto.py index 7266e9b3..e30fd85f 100644 --- a/graflo/util/onto.py +++ b/graflo/util/onto.py @@ -5,20 +5,20 @@ from graflo.architecture.contract.bindings import ( Bindings, + BoundSourceKind, FileConnector, JoinClause, ResourceConnector, - ResourceType, SparqlConnector, TableConnector, ) __all__ = [ "Bindings", + "BoundSourceKind", "FileConnector", "JoinClause", "ResourceConnector", - "ResourceType", "SparqlConnector", "TableConnector", ] diff --git a/test/architecture/test_connector_conn_proxy.py b/test/architecture/test_connector_conn_proxy.py index bf57330e..68a15d93 100644 --- a/test/architecture/test_connector_conn_proxy.py +++ b/test/architecture/test_connector_conn_proxy.py @@ -27,8 +27,10 @@ def test_bindings_connector_connection_resolves_by_name_and_hash() -> None: assert bindings_by_hash.get_conn_proxy_for_connector(connector) == "pg2" -def test_bindings_connector_connection_resolves_by_resource_name() -> None: - # connector.name omitted; manifests can use connector.resource_name as alias. +def test_bindings_connector_connection_resolves_by_connector_hash_without_name() -> ( + None +): + """connector_connection must reference connector hash or name, not resource_name.""" connector = TableConnector( table_name="t1", schema_name="public", @@ -36,7 +38,7 @@ def test_bindings_connector_connection_resolves_by_resource_name() -> None: ) bindings = Bindings( connectors=[connector], - connector_connection=[{"connector": "people", "conn_proxy": "pg"}], + connector_connection=[{"connector": connector.hash, "conn_proxy": "pg"}], ) assert bindings.get_conn_proxy_for_connector(connector) == "pg" @@ -81,8 +83,7 @@ def test_provider_resolves_connector_based_config_for_multiple_resources() -> No assert cfg2.uri == pg_cfg.uri -def test_provider_bind_from_bindings_supports_resource_alias() -> None: - # connector.name omitted; manifests can use connector.resource_name as alias. +def test_provider_bind_from_bindings_supports_connector_hash() -> None: connector = TableConnector( table_name="t1", schema_name="public", @@ -98,7 +99,7 @@ def test_provider_bind_from_bindings_supports_resource_alias() -> None: bindings = Bindings( connectors=[connector], - connector_connection=[{"connector": "people", "conn_proxy": "pg"}], + connector_connection=[{"connector": connector.hash, "conn_proxy": "pg"}], ) provider = InMemoryConnectionProvider() @@ -183,7 +184,7 @@ def test_bind_single_config_for_bindings_binds_and_validates() -> None: bindings = Bindings( connectors=[connector], - connector_connection=[{"connector": "people", "conn_proxy": "pg"}], + connector_connection=[{"connector": connector.hash, "conn_proxy": "pg"}], ) provider = InMemoryConnectionProvider() @@ -203,17 +204,19 @@ def test_bind_single_config_for_bindings_binds_and_validates() -> None: TableConnector( table_name="t1", schema_name="public", + name="c_people", resource_name="people", ), TableConnector( table_name="t2", schema_name="public", + name="c_products", resource_name="products", ), ], connector_connection=[ - {"connector": "people", "conn_proxy": "pg"}, - {"connector": "products", "conn_proxy": "pg2"}, + {"connector": "c_people", "conn_proxy": "pg"}, + {"connector": "c_products", "conn_proxy": "pg2"}, ], ) provider_mismatch = InMemoryConnectionProvider() diff --git a/test/architecture/test_manifest_canonical_contract.py b/test/architecture/test_manifest_canonical_contract.py index 21aabb7a..d5ea3060 100644 --- a/test/architecture/test_manifest_canonical_contract.py +++ b/test/architecture/test_manifest_canonical_contract.py @@ -102,7 +102,7 @@ def test_registry_builder_strict_mode_aggregates_missing_connectors() -> None: assert False, "Expected strict registry build to fail" except ValueError as exc: assert "Registry build failed in strict mode" in str(exc) - assert "No resource type found for resource 'r1'" in str(exc) + assert "No connectors bound for resource 'r1'" in str(exc) def test_resource_finish_init_does_not_mutate_shared_schema_edge_config() -> None: diff --git a/test/architecture/test_resource_filters.py b/test/architecture/test_resource_filters.py index 51096d01..0e21ad0c 100644 --- a/test/architecture/test_resource_filters.py +++ b/test/architecture/test_resource_filters.py @@ -8,6 +8,7 @@ from __future__ import annotations +import pytest from graflo.filter.onto import ComparisonOperator, FilterExpression, LogicalOperator from graflo.onto import ExpressionFlavor @@ -339,7 +340,7 @@ def test_enrichment_adds_joins(self): schema, ingestion_model, bindings = self._make_schema_and_patterns() resource = ingestion_model.fetch_resource("abc_relations") - connector = bindings.get_connector_for_resource("abc_relations") + connector = bindings.get_connectors_for_resource("abc_relations")[0] assert isinstance(connector, TableConnector) enrich_edge_connector_with_joins( @@ -361,7 +362,7 @@ def test_enrichment_adds_is_not_null_filters(self): schema, ingestion_model, bindings = self._make_schema_and_patterns() resource = ingestion_model.fetch_resource("abc_relations") - connector = bindings.get_connector_for_resource("abc_relations") + connector = bindings.get_connectors_for_resource("abc_relations")[0] assert isinstance(connector, TableConnector) enrich_edge_connector_with_joins( @@ -381,7 +382,7 @@ def test_enrichment_noop_when_joins_already_set(self): schema, ingestion_model, bindings = self._make_schema_and_patterns() resource = ingestion_model.fetch_resource("abc_relations") - connector = bindings.get_connector_for_resource("abc_relations") + connector = bindings.get_connectors_for_resource("abc_relations")[0] assert isinstance(connector, TableConnector) connector.joins = [JoinClause(table="x", alias="x", on_self="a", on_other="b")] @@ -401,7 +402,7 @@ def test_full_query_after_enrichment(self): schema, ingestion_model, bindings = self._make_schema_and_patterns() resource = ingestion_model.fetch_resource("abc_relations") - connector = bindings.get_connector_for_resource("abc_relations") + connector = bindings.get_connectors_for_resource("abc_relations")[0] assert isinstance(connector, TableConnector) enrich_edge_connector_with_joins( @@ -415,3 +416,22 @@ def test_full_query_after_enrichment(self): assert "LEFT JOIN" in q assert "IS NOT NULL" in q assert '"sn"."cmdb_rel_ci"' in q + + def test_enrichment_raises_when_vertex_has_multiple_sql_sources(self): + from graflo.hq.auto_join import enrich_edge_connector_with_joins + + schema, ingestion_model, bindings = self._make_schema_and_patterns() + extra = TableConnector( + name="server_alt", table_name="classes_alt", schema_name="sn" + ) + bindings.add_connector(extra) + bindings.bind_resource("server", extra) + resource = ingestion_model.fetch_resource("abc_relations") + connector = bindings.get_connectors_for_resource("abc_relations")[0] + with pytest.raises(ValueError, match="Multiple TableConnectors"): + enrich_edge_connector_with_joins( + resource=resource, + connector=connector, + bindings=bindings, + vertex_config=schema.core_schema.vertex_config, + ) diff --git a/test/data_source/sparql/test_rdf_inference.py b/test/data_source/sparql/test_rdf_inference.py index 8d6b88e2..27ca8628 100644 --- a/test/data_source/sparql/test_rdf_inference.py +++ b/test/data_source/sparql/test_rdf_inference.py @@ -68,8 +68,8 @@ def test_create_bindings(self, sample_ontology_path: Path): mgr = RdfInferenceManager() bindings = mgr.create_bindings(sample_ontology_path) - person_pat = bindings.get_connector_for_resource("Person") - org_pat = bindings.get_connector_for_resource("Organization") + person_pat = bindings.get_connectors_for_resource("Person")[0] + org_pat = bindings.get_connectors_for_resource("Organization")[0] assert isinstance(person_pat, SparqlConnector) assert isinstance(org_pat, SparqlConnector) assert person_pat.rdf_class == "http://example.org/Person" @@ -82,7 +82,7 @@ def test_create_bindings_with_endpoint(self, sample_ontology_path: Path): bindings = mgr.create_bindings(sample_ontology_path, endpoint_url=endpoint) for resource_name in ("Person", "Organization"): - pat = bindings.get_connector_for_resource(resource_name) + pat = bindings.get_connectors_for_resource(resource_name)[0] assert isinstance(pat, SparqlConnector) assert pat.endpoint_url == endpoint assert pat.rdf_file is None diff --git a/test/db/postgres/test_ingest_datetime_range.py b/test/db/postgres/test_ingest_datetime_range.py index b812487b..140b15e3 100644 --- a/test/db/postgres/test_ingest_datetime_range.py +++ b/test/db/postgres/test_ingest_datetime_range.py @@ -43,16 +43,16 @@ def test_datetime_columns_sets_date_field_on_connectors(conn_conf, load_mock_sch "users": "created_at", }, ) - purchases = connectors.get_connector_for_resource("purchases") - users = connectors.get_connector_for_resource("users") + purchases = connectors.get_connectors_for_resource("purchases")[0] + users = connectors.get_connectors_for_resource("users")[0] assert isinstance(purchases, TableConnector) assert isinstance(users, TableConnector) assert purchases.date_field == "purchase_date" assert users.date_field == "created_at" # Tables not in the map have no date_field - follows = connectors.get_connector_for_resource("follows") - if isinstance(follows, TableConnector): - assert follows.date_field is None + follows_list = connectors.get_connectors_for_resource("follows") + if follows_list and isinstance(follows_list[0], TableConnector): + assert follows_list[0].date_field is None def test_ingest_datetime_range_postgres(postgres_conn, load_mock_schema): diff --git a/test/test_filter_view.py b/test/test_filter_view.py index b209b428..46a9657c 100644 --- a/test/test_filter_view.py +++ b/test/test_filter_view.py @@ -601,7 +601,7 @@ def test_type_lookup_overrides_sets_view_on_connector(self): }, ) - tp = bindings.get_connector_for_resource("entity_links") + tp = bindings.get_connectors_for_resource("entity_links")[0] assert isinstance(tp, TableConnector) assert tp.view is not None assert tp.view.kind == "type_lookup" diff --git a/test/test_patterns.py b/test/test_patterns.py index 3334640f..c476fdb0 100644 --- a/test/test_patterns.py +++ b/test/test_patterns.py @@ -3,6 +3,7 @@ from graflo.architecture.contract.bindings import ( Bindings, + BoundSourceKind, FileConnector, ResourceConnectorBinding, TableConnector, @@ -23,8 +24,11 @@ def test_connectors(): bindings.bind_resource("b", connector_b) # Test that connectors work correctly (narrow to FileConnector for .sub_path) - connector_a_loaded = bindings.get_connector_for_resource("a") - connector_b_loaded = bindings.get_connector_for_resource("b") + conns_a = bindings.get_connectors_for_resource("a") + conns_b = bindings.get_connectors_for_resource("b") + assert len(conns_a) == 1 and len(conns_b) == 1 + connector_a_loaded = conns_a[0] + connector_b_loaded = conns_b[0] assert isinstance(connector_a_loaded, FileConnector) assert isinstance(connector_b_loaded, FileConnector) assert connector_a_loaded.sub_path is not None @@ -32,11 +36,8 @@ def test_connectors(): assert connector_b_loaded.sub_path is not None assert str(connector_b_loaded.sub_path / "a") == "a" - # Test that connectors can be accessed by name - assert bindings.get_connector_for_resource("a") is not None - assert bindings.get_connector_for_resource("b") is not None - assert bindings.get_resource_type("a") == "file" - assert bindings.get_resource_type("b") == "file" + assert connector_a_loaded.bound_source_kind() == BoundSourceKind.FILE + assert connector_b_loaded.bound_source_kind() == BoundSourceKind.FILE def test_file_connector_basic(): @@ -237,8 +238,8 @@ def test_connectors_with_filtering(): bindings.bind_resource("events", table_connector) # Verify connectors are stored correctly (narrow with isinstance checks) - users_pattern = bindings.get_connector_for_resource("users") - events_pattern = bindings.get_connector_for_resource("events") + users_pattern = bindings.get_connectors_for_resource("users")[0] + events_pattern = bindings.get_connectors_for_resource("events")[0] assert isinstance(users_pattern, FileConnector) assert users_pattern.regex == r".*\.csv$" assert isinstance(events_pattern, TableConnector) @@ -331,9 +332,10 @@ def test_bindings_support_connector_resource_name_mapping(): ) ] ) - connector = bindings.get_connector_for_resource("users") - assert isinstance(connector, FileConnector) - assert bindings.get_resource_type("users") == "file" + conns = bindings.get_connectors_for_resource("users") + assert len(conns) == 1 + assert isinstance(conns[0], FileConnector) + assert conns[0].bound_source_kind() == BoundSourceKind.FILE def test_bindings_support_top_level_resource_connector_objects(): @@ -347,33 +349,53 @@ def test_bindings_support_top_level_resource_connector_objects(): ResourceConnectorBinding(resource="orders", connector="orders_table") ], ) - connector = bindings.get_connector_for_resource("orders") - assert isinstance(connector, TableConnector) - assert connector.table_name == "orders" - assert connector.schema_name == "public" + conns = bindings.get_connectors_for_resource("orders") + assert len(conns) == 1 + assert isinstance(conns[0], TableConnector) + assert conns[0].table_name == "orders" + assert conns[0].schema_name == "public" -def test_bindings_reject_conflicting_resource_mappings(): - with pytest.raises(ValueError, match="Conflicting resource binding"): +def test_bindings_allow_multiple_resource_connector_mappings(): + bindings = Bindings( + connectors=[ + FileConnector( + name="users_files", + regex=r"^users.*\.csv$", + sub_path=pathlib.Path("."), + resource_name="users", + ), + FileConnector( + name="users_backup_files", + regex=r"^users_backup.*\.csv$", + sub_path=pathlib.Path("."), + ), + ], + resource_connector=[ + ResourceConnectorBinding(resource="users", connector="users_backup_files") + ], + ) + conns = bindings.get_connectors_for_resource("users") + assert len(conns) == 2 + by_name = {c.name for c in conns if isinstance(c, FileConnector)} + assert by_name == {"users_files", "users_backup_files"} + + +def test_bindings_connector_connection_rejects_resource_key_as_connector_ref(): + """connector_connection.connector must name a connector (name or hash), not a resource.""" + with pytest.raises(ValueError, match="Unknown connector reference"): Bindings( connectors=[ FileConnector( - name="users_files", - regex=r"^users.*\.csv$", - sub_path=pathlib.Path("."), - resource_name="users", - ), - FileConnector( - name="users_backup_files", - regex=r"^users_backup.*\.csv$", + name="openalex", + regex=r".*\.jsonl$", sub_path=pathlib.Path("."), - ), + ) ], resource_connector=[ - ResourceConnectorBinding( - resource="users", connector="users_backup_files" - ) + ResourceConnectorBinding(resource="work", connector="openalex") ], + connector_connection=[{"connector": "work", "conn_proxy": "main"}], ) @@ -388,9 +410,10 @@ def test_bindings_resource_connector_accepts_dict_entries(): ], resource_connector=[{"resource": "work", "connector": "openalex"}], ) - connector = bindings.get_connector_for_resource("work") - assert isinstance(connector, FileConnector) - assert connector.name == "openalex" + conns = bindings.get_connectors_for_resource("work") + assert len(conns) == 1 + assert isinstance(conns[0], FileConnector) + assert conns[0].name == "openalex" def test_bindings_resource_connector_validation_error_message(): From 69d6a305d82c6ce5d02173c3d3f4437decbd2813 Mon Sep 17 00:00:00 2001 From: Alexander Belikov Date: Wed, 1 Apr 2026 00:30:25 +0200 Subject: [PATCH 2/4] improved max_items --- CHANGELOG.md | 28 +++- README.md | 2 +- docs/concepts/index.md | 5 +- docs/examples/example-9.md | 4 +- docs/getting_started/creating_manifest.md | 6 +- docs/getting_started/quickstart.md | 2 +- .../9-connector-connection-proxy/README.md | 1 + .../explicit_proxy_binding.py | 9 +- graflo/data_source/api.py | 23 +-- graflo/data_source/base.py | 7 +- graflo/data_source/rdf.py | 137 +++++++++++++----- graflo/data_source/sql.py | 22 +-- graflo/hq/caster.py | 1 + graflo/hq/ingestion_parameters.py | 23 ++- pyproject.toml | 2 +- uv.lock | 2 +- 16 files changed, 196 insertions(+), 78 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3970fee6..f9c2b3fc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,30 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [1.7.9] - 2026-03-31 + +### Added + +- **`Bindings.get_connectors_for_resource(name)`** returns an ordered list of connectors (unique by hash) for an ingestion resource, supporting **1→n** resource–connector wiring. +- **`BoundSourceKind`** enum (`file`, `sql_table`, `sparql`) and **`ResourceConnector.bound_source_kind()`** describe the physical source modality of a connector (replacing the old “resource type” wording). + +### Changed + +- **Ingestion caps**: `IngestionParams.max_items` is documented and validated (`>= 1` when set). **`SparqlEndpointDataSource.iter_batches`** paginates without loading the full endpoint result into memory, uses **`ORDER BY ?s`** when the query has no `ORDER BY`, and honors **`limit`** as a subject count. **`SQLDataSource`** and offset/page **API** pagination pass a tighter per-request page size when a total cap is close (fewer over-fetched rows/items). +- **`RegistryBuilder`** registers **every** connector bound to each resource and dispatches on **`connector.bound_source_kind()`**; SQL registration uses the connector’s own table/schema fields instead of a resource-level table lookup. +- **Auto-join** (`_vertex_table_info`) resolves table metadata via the list API and **raises** if more than one `TableConnector` is bound to the same vertex/resource key used for disambiguation. + +### Breaking + +- **`ResourceType`** removed in favor of **`BoundSourceKind`**; **`get_resource_type()`** removed in favor of **`bound_source_kind()`** on connectors (update imports and call sites). +- **`Bindings`**: **`get_connector_for_resource`**, **`get_resource_type`**, and **`get_table_info`** removed; use **`get_connectors_for_resource`** and connector fields / `bound_source_kind()` instead. +- **`connector_connection` / internal connector refs**: resolution allows only **connector `name`** or **canonical `hash`**. Using an ingestion **resource name** as a `connector` reference is no longer supported (resource names are no longer 1:1 with connectors). +- **`bind_resource`** and manifest **`resource_connector`** validation: additional rows for the same `resource` append connectors instead of replacing or conflicting. + +### Documentation + +- **Examples / docs**: `examples/9-connector-connection-proxy` and manifest guides updated for explicit connector names in `connector_connection`. Concepts and README clarify 1→n bindings and proxy wiring. + ## [1.7.7] - 2026-03-27 ### Changed @@ -20,8 +44,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - **`Bindings.connector_connection_bindings`** (typed view), **`get_conn_proxy_for_connector`**, and **`bind_connector_to_conn_proxy`**: API aligned with HQ loaders (`ResourceMapper`, `GraphEngine`) for proxy-based source wiring. ### Changed -- **Connector reference resolution**: `connector_connection` entries may reference a connector by canonical **hash**, declared **`name`**, or a **`resource` name** when that resource is already mapped to the connector (mirrors validation in `Bindings`). -- **`Bindings` validation**: duplicate connector `name` values, conflicting resource→connector mappings, and conflicting `conn_proxy` for the same connector hash now fail fast with explicit errors. +- **Connector reference resolution**: `connector_connection` entries may reference a connector by canonical **hash**, declared **`name`**, or a **`resource` name** when that resource is already mapped to the connector (mirrors validation in `Bindings`). **Update (1.7.8):** resource-name aliasing for `connector` refs was removed; use **connector `name` or `hash`** only. +- **`Bindings` validation**: duplicate connector `name` values and conflicting `conn_proxy` for the same connector hash now fail fast with explicit errors. **Update (1.7.8):** many connectors may attach to the same ingestion resource (1→n); overlapping resource rows no longer raise “conflicting resource binding” for distinct connectors. ### Breaking - **`Bindings.from_dict` / manifest validation**: legacy top-level keys `postgres_connections`, `table_connectors`, `file_connectors`, and `sparql_connectors` are rejected. Migrate to the unified `connectors` + `resource_connector` (+ optional `connector_connection`) shape. diff --git a/README.md b/README.md index 8ed877df..c5c65915 100644 --- a/README.md +++ b/README.md @@ -76,7 +76,7 @@ ArangoDB, Neo4j, TigerGraph, FalkorDB, Memgraph, NebulaGraph — same API for al - **Schema inference** — Generate graph schemas from PostgreSQL 3NF databases (PK/FK heuristics) or from OWL/RDFS ontologies (`owl:Class` → vertices, `owl:ObjectProperty` → edges, `owl:DatatypeProperty` → vertex fields). - **Typed fields** — Vertex fields and edge weights carry types (`INT`, `FLOAT`, `STRING`, `DATETIME`, `BOOL`) for validation and database-specific optimisation. - **Parallel batch processing** — Configurable batch sizes and multi-core execution. -- **Credential-free source contracts** — `Bindings.connector_connection` maps each `TableConnector` / `SparqlConnector` (by name, hash, or resource alias) to a `conn_proxy` label. Manifests stay free of secrets; a runtime `ConnectionProvider` resolves each proxy to concrete `GeneralizedConnConfig` (for example PostgreSQL or SPARQL endpoint settings). +- **Credential-free source contracts** — `Bindings.connector_connection` maps each `TableConnector` / `SparqlConnector` (by **connector name** or **hash**) to a `conn_proxy` label. Manifests stay free of secrets; a runtime `ConnectionProvider` resolves each proxy to concrete `GeneralizedConnConfig` (for example PostgreSQL or SPARQL endpoint settings). Ingestion resource names are separate and may map to multiple connectors. ## Documentation Full documentation is available at: [growgraph.github.io/graflo](https://growgraph.github.io/graflo) diff --git a/docs/concepts/index.md b/docs/concepts/index.md index 5a066352..01ba19a2 100644 --- a/docs/concepts/index.md +++ b/docs/concepts/index.md @@ -46,7 +46,7 @@ flowchart LR - **GraphManifest** — the canonical top-level contract that composes `schema`, `ingestion_model`, and `bindings`. - **Schema** — the declarative logical graph model (`Schema`): vertex/edge definitions, identities, typed fields, and DB profile. - **IngestionModel** — reusable resources and transforms used to map records into graph entities. -- **Bindings** — named `FileConnector` / `TableConnector` / `SparqlConnector` list plus `resource_connector` (resource→connector) and optional `connector_connection` (connector→`conn_proxy` for runtime `ConnectionProvider` resolution without secrets in the manifest). +- **Bindings** — named `FileConnector` / `TableConnector` / `SparqlConnector` list plus `resource_connector` (many rows per resource allowed: resource→0..n connectors) and optional `connector_connection` (connector **name** or **hash**→`conn_proxy` for runtime `ConnectionProvider` resolution without secrets in the manifest). Each connector exposes a **bound source modality** (`BoundSourceKind`: file, SQL table, SPARQL) for dispatch, distinct from the abstract ingestion **Resource**. - **Database-Independent Graph Representation** — a `GraphContainer` of vertices and edges, independent of any target database. - **Graph DB** — the target LPG store (ArangoDB, Neo4j, TigerGraph, FalkorDB, Memgraph, NebulaGraph). @@ -94,7 +94,7 @@ flowchart LR Res --> Ex --> Asm --> GC --> DBW ``` -- **Bindings** (`FileConnector`, `TableConnector`, `SparqlConnector`) describe *where* data comes from (file paths, SQL tables, SPARQL endpoints). Optional **`connector_connection`** entries assign each SQL/SPARQL connector a **`conn_proxy`** label; the `ConnectionProvider` turns that label into real connection config at runtime so manifests stay credential-free. +- **Bindings** (`FileConnector`, `TableConnector`, `SparqlConnector`) describe *where* data comes from (file paths, SQL tables, SPARQL endpoints). Multiple connectors may attach to the same ingestion resource name; optional **`connector_connection`** entries assign each SQL/SPARQL connector a **`conn_proxy`** by **connector `name` or `hash`** (not by resource name). The `ConnectionProvider` turns that label into real connection config at runtime so manifests stay credential-free. - **DataSources** (`AbstractDataSource` subclasses) handle *how* to read data in batches. Each carries a `DataSourceType` and is registered in the `DataSourceRegistry`. - **Resources** define *what* to extract — each `Resource` is a reusable actor pipeline (descend → transform → vertex → edge) that maps raw records to graph elements. - **GraphContainer** (covariant graph representation) collects the resulting vertices and edges in a database-independent format. @@ -176,6 +176,7 @@ classDiagram +connectors: list~ResourceConnector~ +resource_connector: list~ResourceConnectorBinding~ +connector_connection: list~ConnectorConnectionBinding~ + +get_connectors_for_resource(name) list +get_conn_proxy_for_connector(connector) str? +bind_connector_to_conn_proxy(connector, conn_proxy) } diff --git a/docs/examples/example-9.md b/docs/examples/example-9.md index 8c6f5997..c8e90705 100644 --- a/docs/examples/example-9.md +++ b/docs/examples/example-9.md @@ -8,7 +8,7 @@ The manifest stays credential-free: `bindings.connector_connection` only contain ## Manifest: what `connector_connection` looks like -Inside `bindings` you explicitly map each connector to a proxy label: +Inside `bindings` you explicitly map each connector to a proxy label. The `connector` field must be a **connector `name`** or **canonical hash**, not an ingestion resource name (a resource may be bound to several connectors). ```yaml bindings: @@ -23,7 +23,7 @@ bindings: conn_proxy: postgres_source ``` -In the code, connectors omit `connector.name` and use `connector.resource_name` (so the manifest references are stable and human-readable). +In the companion script, each `TableConnector` sets `name` to match those references (here they match the table/resource names only for readability). ## Runtime: how the proxy label becomes a real DB config diff --git a/docs/getting_started/creating_manifest.md b/docs/getting_started/creating_manifest.md index 04512145..86cc8535 100644 --- a/docs/getting_started/creating_manifest.md +++ b/docs/getting_started/creating_manifest.md @@ -82,10 +82,10 @@ Use `ingestion_model` for **how source records become vertices/edges**. Defines source wiring (`Bindings`). - **`connectors`**: list of `FileConnector`, `TableConnector`, or `SparqlConnector` entries (where each row points at paths, tables, or RDF/SPARQL sources). -- **`resource_connector`**: list of `{"resource": "", "connector": ""}` rows linking `IngestionModel.resources[*].name` to a connector. -- **`connector_connection`** (optional): list of `{"connector": "", "conn_proxy": "