GraFlo is a Graph Schema & Transformation Language (GSTL) for labeled property graphs (LPGs). You describe the graph once—vertices and edges, typed properties, identity, and optional backend hints—in YAML or Python. You describe how raw records become that graph using resource pipelines (an expressive sequence of actors: descend, transform, vertex, edge, and routers). Connectors attach files, SQL tables, SPARQL/RDF, APIs, or in-memory data to those pipelines. GraphEngine and Caster then infer schema when possible, project the logical model for a chosen database, and ingest.
Why it matters: the logical graph is database-agnostic; the same manifest can target ArangoDB, Neo4j, TigerGraph, FalkorDB, Memgraph, or NebulaGraph without rewriting your transformation story. Backend-specific names, defaults, and indexes are applied only at DB-aware projection (Schema.resolve_db_aware(...)).
Package Renamed: This package was formerly known as
graphcast.
| Idea | What you get |
|---|---|
| Logical LPG first | One declarative schema (Vertex / Edge with properties) is the source of truth—not a particular vendor’s DDL. |
| Expressive transformation | Resource pipelines compose small actors so wide tables, nested JSON, RDF, or API payloads map cleanly to vertices and edges—reusable across sources. |
| Separation of concerns | Sources (connectors + DataSourceRegistry), shape of the graph (Schema), and ingestion steps (IngestionModel) evolve independently. |
| Safe wiring | Optional connector_connection maps connectors to conn_proxy labels so manifests stay free of secrets; a runtime ConnectionProvider supplies credentials. |
GraFlo separates what the graph looks like from where data comes from and which database stores it.
%%{ init: {
"theme": "base",
"themeVariables": {
"primaryColor": "#90CAF9",
"primaryTextColor": "#111111",
"primaryBorderColor": "#1E88E5",
"lineColor": "#546E7A",
"secondaryColor": "#A5D6A7",
"tertiaryColor": "#CE93D8"
}
} }%%
flowchart LR
SI["<b>Source Instance</b><br/>File · SQL · SPARQL · API"]
R["<b>Resource</b><br/>Actor Pipeline"]
GS["<b>Logical Graph Schema</b><br/>Vertex/Edge Definitions<br/>Identities · DB Profile"]
DBA["<b>DB-aware Projection</b><br/>DatabaseProfile<br/>VertexConfigDBAware · EdgeConfigDBAware"]
GC["<b>GraphContainer</b><br/>Covariant Graph Representation"]
DB["<b>Graph DB (LPG)</b><br/>ArangoDB · Neo4j · TigerGraph · Others"]
SI --> R --> GS --> GC --> DBA --> DB
Source Instance → Resource (actors) → Logical Graph Schema → Covariant Graph Representation (GraphContainer) → DB-aware Projection → Graph DB
| Stage | Role | Code |
|---|---|---|
| Source Instance | A concrete data artifact — a CSV file, a PostgreSQL table, a SPARQL endpoint, a .ttl file. |
AbstractDataSource subclasses (FileDataSource, SQLDataSource, SparqlEndpointDataSource, …) with a DataSourceType. |
| Resource | A reusable transformation pipeline — actor steps (descend, transform, vertex, edge, vertex_router, edge_router) that map raw records to graph elements. Data sources bind to Resources by name via the DataSourceRegistry. |
Resource (part of IngestionModel). |
| Graph Schema | Declarative logical vertex/edge definitions, identities, typed properties, and DB profile — defined in YAML or Python. | Schema, VertexConfig, EdgeConfig. |
| Covariant Graph Representation | A database-independent collection of vertices and edges. | GraphContainer. |
| DB-aware Projection | Resolves DB-specific naming/default/index behavior from logical schema + DatabaseProfile. |
Schema.resolve_db_aware(), VertexConfigDBAware, EdgeConfigDBAware. |
| Graph DB | The target LPG store — same API for all supported databases. | ConnectionManager, DBWriter, DB connectors. |
| DataSourceType | Connector | DataSource | Schema inference |
|---|---|---|---|
FILE — CSV / JSON / JSONL / Parquet |
FileConnector |
FileDataSource |
manual |
SQL — PostgreSQL tables |
TableConnector |
SQLDataSource |
automatic (3NF with PK/FK) |
SPARQL — RDF files (.ttl, .rdf, .n3) |
SparqlConnector |
RdfFileDataSource |
automatic (OWL/RDFS ontology) |
SPARQL — SPARQL endpoints (Fuseki, …) |
SparqlConnector |
SparqlEndpointDataSource |
automatic (OWL/RDFS ontology) |
API — REST APIs |
— | APIDataSource |
manual |
IN_MEMORY — list / DataFrame |
— | InMemoryDataSource |
manual |
ArangoDB, Neo4j, TigerGraph, FalkorDB, Memgraph, NebulaGraph — same API for all.
- Declarative LPG schema — Define vertices, edges, vertex identity, secondary DB indexes, edge properties, and transforms in YAML or Python. The
Schemais the single source of truth, independent of source or target. - Database abstraction — One logical schema, one API. Target ArangoDB, Neo4j, TigerGraph, FalkorDB, Memgraph, or NebulaGraph without rewriting pipelines. DB idiosyncrasies are handled in DB-aware projection (
Schema.resolve_db_aware(...)) and connector/writer stages. - Resource abstraction — Each
Resourcedefines a reusable actor pipeline (descend, transform, vertex, edge, plus VertexRouter and EdgeRouter for dynamic type-based routing) that maps raw records to graph elements. Data sources bind to Resources by name via theDataSourceRegistry, decoupling transformation logic from data retrieval. - SPARQL & RDF support — Query SPARQL endpoints (e.g. Apache Fuseki), read
.ttl/.rdf/.n3files, and auto-infer schemas from OWL/RDFS ontologies (rdflibandSPARQLWrappership with the default package). - Schema inference — Generate graph schemas from PostgreSQL 3NF databases (PK/FK heuristics) or from OWL/RDFS ontologies (
owl:Class→ vertices,owl:ObjectProperty→ edges,owl:DatatypeProperty→ vertex properties). - Typed properties — Vertex and edge
propertiesmay carry types (INT,FLOAT,STRING,DATETIME,BOOL) for validation and database-specific optimisation. - Parallel batch processing — Configurable batch sizes and multi-core execution.
- Credential-free source contracts —
Bindings.connector_connectionmaps eachTableConnector/SparqlConnector(by connector name or hash) to aconn_proxylabel. Manifests stay free of secrets; a runtimeConnectionProviderresolves each proxy to concreteGeneralizedConnConfig(for example PostgreSQL or SPARQL endpoint settings). Ingestion resource names are separate and may map to multiple connectors.
Full documentation is available at: growgraph.github.io/graflo
pip install grafloOptional extras (see pyproject.toml → [project.optional-dependencies]):
dev— pytest, ty, pre-commitdocs— MkDocs stack for building the documentation siteplot—pygraphvizfor theplot_manifestCLI (install system Graphviz first)
pip install "graflo[dev]"
pip install "graflo[dev,docs,plot]"from suthing import FileHandle
from graflo import Bindings, GraphManifest
from graflo.db.connection.onto import ArangoConfig
manifest = GraphManifest.from_config(FileHandle.load("schema.yaml"))
manifest.finish_init()
schema = manifest.require_schema()
ingestion_model = manifest.require_ingestion_model()
# Option 1: Load config from docker/arango/.env (recommended)
conn_conf = ArangoConfig.from_docker_env()
# Option 2: Load from environment variables
# Set: ARANGO_URI, ARANGO_USERNAME, ARANGO_PASSWORD, ARANGO_DATABASE
conn_conf = ArangoConfig.from_env()
# Option 3: Load with custom prefix (for multiple configs)
# Set: USER_ARANGO_URI, USER_ARANGO_USERNAME, USER_ARANGO_PASSWORD, USER_ARANGO_DATABASE
user_conn_conf = ArangoConfig.from_env(prefix="USER")
# Option 4: Create config directly
# conn_conf = ArangoConfig(
# uri="http://localhost:8535",
# username="root",
# password="123",
# database="mygraph", # For ArangoDB, 'database' maps to schema/graph
# )
# Note: If 'database' (or 'schema_name' for TigerGraph) is not set,
# Caster will automatically use Schema.metadata.name as fallback
from graflo.architecture.contract.bindings import FileConnector
import pathlib
# Create Bindings with file connectors
bindings = Bindings()
work_connector = FileConnector(regex="\Sjson$", sub_path=pathlib.Path("./data"))
bindings.add_connector(
work_connector,
)
bindings.bind_resource("work", work_connector)
# Or initialize via connectors + resource_connector
# bindings = Bindings(
# connectors=[
# FileConnector(
# name="work_files",
# regex="^work\\.json$",
# sub_path=pathlib.Path("./data"),
# )
# ],
# resource_connector=[{"resource": "work", "connector": "work_files"}],
# # Optional: for SQL/SPARQL connectors, name a proxy; register secrets via ConnectionProvider.
# # connector_connection=[{"connector": "work_files", "conn_proxy": "files_readonly"}],
# )
from graflo.hq.caster import IngestionParams
from graflo.hq import GraphEngine
# Option 1: Use GraphEngine for schema definition and ingestion (recommended)
engine = GraphEngine()
ingestion_params = IngestionParams(
clear_data=False,
# max_items=1000, # Optional: limit number of items to process
# batch_size=10000, # Optional: customize batch size
)
ingest_manifest = manifest.model_copy(update={"bindings": bindings})
ingest_manifest.finish_init()
engine.define_and_ingest(
manifest=ingest_manifest,
target_db_config=conn_conf, # Target database config
ingestion_params=ingestion_params,
recreate_schema=False, # Set to True to drop and redefine schema (script halts if schema exists)
)
# Option 2: Use Caster directly (schema must be defined separately)
# from graflo.hq import GraphEngine
# engine = GraphEngine()
# engine.define_schema(manifest=manifest, target_db_config=conn_conf, recreate_schema=False)
#
# caster = Caster(schema=schema, ingestion_model=ingestion_model)
# caster.ingest(
# target_db_config=conn_conf,
# bindings=bindings,
# ingestion_params=ingestion_params,
# )from graflo.hq import GraphEngine
from graflo.db.connection.onto import PostgresConfig, ArangoConfig
from graflo import Caster
from graflo.onto import DBType
# Connect to PostgreSQL
postgres_config = PostgresConfig.from_docker_env() # or PostgresConfig.from_env()
# Create GraphEngine and infer schema from PostgreSQL 3NF database
# Connection is automatically managed inside infer_schema()
engine = GraphEngine(target_db_flavor=DBType.ARANGO)
manifest = engine.infer_manifest(
postgres_config,
schema_name="public", # PostgreSQL schema name
)
schema = manifest.require_schema()
ingestion_model = manifest.require_ingestion_model()
# Define schema in target database (optional, can also use define_and_ingest)
target_config = ArangoConfig.from_docker_env()
engine.define_schema(
manifest=manifest,
target_db_config=target_config,
recreate_schema=False,
)
# Use the inferred schema with Caster for ingestion
caster = Caster(schema=schema, ingestion_model=ingestion_model)
# ... continue with ingestionfrom pathlib import Path
from graflo.hq import GraphEngine
from graflo.db.connection.onto import ArangoConfig
from graflo.architecture.manifest import GraphManifest
engine = GraphEngine()
# Infer schema from an OWL/RDFS ontology file
ontology = Path("ontology.ttl")
schema, ingestion_model = engine.infer_schema_from_rdf(source=ontology)
# Create source bindings (reads a local .ttl file per rdf:Class)
bindings = engine.create_bindings_from_rdf(source=ontology)
# Or point at a SPARQL endpoint instead:
# from graflo.db.connection.onto import SparqlEndpointConfig
# sparql_cfg = SparqlEndpointConfig(uri="http://localhost:3030", dataset="mydata")
# bindings = engine.create_bindings_from_rdf(
# source=ontology,
# endpoint_url=sparql_cfg.query_endpoint,
# )
target = ArangoConfig.from_docker_env()
engine.define_and_ingest(
manifest=GraphManifest(
graph_schema=schema,
ingestion_model=ingestion_model,
bindings=bindings,
),
target_db_config=target,
)To install requirements
git clone git@github.com:growgraph/graflo.git && cd graflo
uv sync --extra devQuick Start: To start all test databases at once, use the convenience scripts from the docker folder:
cd docker
./start-all.sh # Start all services
./stop-all.sh # Stop all services
./cleanup-all.sh # Remove containers and volumesIndividual Services: To start individual databases, navigate to each database folder and run:
Spin up Arango from arango docker folder by
docker-compose --env-file .env up arangoNeo4j from neo4j docker folder by
docker-compose --env-file .env up neo4jTigerGraph from tigergraph docker folder by
docker-compose --env-file .env up tigergraphFalkorDB from falkordb docker folder by
docker-compose --env-file .env up falkordbMemgraph from memgraph docker folder by
docker-compose --env-file .env up memgraphNebulaGraph from nebula docker folder by
docker-compose --env-file .env upand Apache Fuseki from fuseki docker folder by
docker-compose --env-file .env up fusekiTo run unit tests
uv run pytest testNote: Tests require external database containers (ArangoDB, Neo4j, TigerGraph, FalkorDB, Memgraph, NebulaGraph, Fuseki) to be running. CI builds intentionally skip test execution. Tests must be run locally with the required database images started (see Test databases section above). NebulaGraph tests are gated behind
pytest --run-nebula.
- Python 3.11+ (Python 3.11 and 3.12 are officially supported)
- python-arango
- nebula3-python>=3.8.3 (NebulaGraph v3.x support)
- nebula5-python>=5.2.1 (NebulaGraph v5.x support)
- sqlalchemy>=2.0.0 (for PostgreSQL and SQL data sources)
- rdflib>=7.0.0 + SPARQLWrapper>=2.0.0 (included in the default install)
Contributions are welcome! Please feel free to submit a Pull Request.