Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion orchestrator/metastore/sql/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,29 @@

from orchestrator.utilities.location import SQLStoreConfiguration

# Process-level cache: reuse the same SQLAlchemy Engine (and its connection pool)
# for every call with the same database URL. This means the metastore and the
# samplestore — which both point at the same MySQL server — share one pool and
# avoid the overhead of opening a second TCP connection.
_engine_cache: dict[str, sqlalchemy.Engine] = {}


def engine_for_sql_store(
configuration: SQLStoreConfiguration, database: str | None = None
) -> sqlalchemy.Engine:
"""Return a SQLAlchemy Engine for the given store configuration.

Engines are cached by their connection URL so that multiple components
connecting to the same database reuse a single connection pool rather than
each opening their own TCP connection.

Args:
configuration: Database connection parameters.
database: Optional database name override.

Returns:
A (possibly cached) SQLAlchemy Engine.
"""
if configuration is None:
raise ValueError("engine_for_sql_store requires a valid SQLStoreConfiguration")

Expand All @@ -27,6 +46,10 @@ def engine_for_sql_store(
if configuration.scheme == "sqlite"
else configuration.url().unicode_string()
)

if db_location in _engine_cache:
return _engine_cache[db_location]

engine_args: dict = {"echo": False}
if configuration.scheme != "sqlite":
# Prevent "Lost connection to MySQL server during query" (error 2013) when
Expand All @@ -39,7 +62,10 @@ def engine_for_sql_store(
# Other components on the connection also may close the connection at
# other unknown intervals
# engine_args["pool_recycle"] = 1800
return sqlalchemy.create_engine(db_location, **engine_args)

engine = sqlalchemy.create_engine(db_location, **engine_args)
_engine_cache[db_location] = engine
return engine


def create_sql_resource_store(engine: sqlalchemy.Engine) -> sqlalchemy.Engine:
Expand Down