From f2bc40bf3f1978933c36dcf808ea304fbe7d473e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=AF=D0=BA=D0=B8=D0=BC=D0=B5=D0=BD=D0=BA=D0=BE=D0=B2=20?= =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB=20=D0=90=D0=BB=D0=B5=D0=BA?= =?UTF-8?q?=D1=81=D0=B0=D0=BD=D0=B4=D1=80=D0=BE=D0=B2=D0=B8=D1=87?= Date: Wed, 18 Mar 2026 17:36:51 +0300 Subject: [PATCH 1/8] [DOP-34705] add recursive query for job_dependencies --- .../db/repositories/job_dependency.py | 107 +++++++++++++++--- data_rentgen/server/api/v1/router/job.py | 6 +- data_rentgen/server/schemas/v1/job.py | 1 + data_rentgen/server/services/job.py | 16 ++- tests/test_server/fixtures/factories/job.py | 45 ++++++++ .../test_jobs/test_job_dependencies.py | 60 ++++++++++ 6 files changed, 217 insertions(+), 18 deletions(-) diff --git a/data_rentgen/db/repositories/job_dependency.py b/data_rentgen/db/repositories/job_dependency.py index b54aa705..53620142 100644 --- a/data_rentgen/db/repositories/job_dependency.py +++ b/data_rentgen/db/repositories/job_dependency.py @@ -2,7 +2,8 @@ # SPDX-License-Identifier: Apache-2.0 from typing import Literal -from sqlalchemy import ARRAY, Integer, any_, bindparam, cast, func, or_, select, tuple_ +from sqlalchemy import ARRAY, Integer, any_, bindparam, cast, func, literal, select, tuple_ +from sqlalchemy.orm import aliased from data_rentgen.db.models.job_dependency import JobDependency from data_rentgen.db.repositories.base import Repository @@ -26,6 +27,66 @@ JobDependency.to_job_id == bindparam("to_job_id"), ) +source_jobs = aliased(JobDependency, name="source") +target_jobs = aliased(JobDependency, name="target") + +upstream_jobs_query_base_part = ( + select( + JobDependency.from_job_id.label("from_job_id"), + JobDependency.to_job_id.label("to_job_id"), + JobDependency.type.label("type"), + literal(1).label("depth"), + ) + .select_from(JobDependency) + .where(JobDependency.to_job_id == any_(bindparam("job_ids"))) +) +upstream_jobs_query_cte = upstream_jobs_query_base_part.cte(name="upstream_jobs_query", recursive=True) + +upstream_jobs_query_recursive_part = ( + select( + JobDependency.from_job_id.label("from_job_id"), + JobDependency.to_job_id.label("to_job_id"), + JobDependency.type.label("type"), + (upstream_jobs_query_cte.c.depth + 1).label("depth"), + ) + .select_from(JobDependency) + .where( + upstream_jobs_query_cte.c.depth < bindparam("depth"), + JobDependency.to_job_id == upstream_jobs_query_cte.c.from_job_id, + ) +) + + +upstream_jobs_query_cte = upstream_jobs_query_cte.union_all(upstream_jobs_query_recursive_part) + +downstream_jobs_query_base_part = ( + select( + JobDependency.from_job_id.label("from_job_id"), + JobDependency.to_job_id.label("to_job_id"), + JobDependency.type.label("type"), + literal(1).label("depth"), + ) + .select_from(JobDependency) + .where(JobDependency.from_job_id == any_(bindparam("job_ids"))) +) +downstream_jobs_query_cte = downstream_jobs_query_base_part.cte(name="downstream_jobs_query", recursive=True) + +downstream_jobs_query_recursive_part = ( + select( + JobDependency.from_job_id.label("from_job_id"), + JobDependency.to_job_id.label("to_job_id"), + JobDependency.type.label("type"), + (downstream_jobs_query_cte.c.depth + 1).label("depth"), + ) + .select_from(JobDependency) + .where( + downstream_jobs_query_cte.c.depth < bindparam("depth"), + JobDependency.from_job_id == downstream_jobs_query_cte.c.to_job_id, + ) +) + +downstream_jobs_query_cte = downstream_jobs_query_cte.union_all(downstream_jobs_query_recursive_part) + class JobDependencyRepository(Repository[JobDependency]): async def fetch_bulk( @@ -60,25 +121,43 @@ async def get_dependencies( self, job_ids: list[int], direction: Literal["UPSTREAM", "DOWNSTREAM", "BOTH"], + depth: int, ) -> list[JobDependency]: - job_dependency_query = select(JobDependency) match direction: case "UPSTREAM": - job_dependency_query = job_dependency_query.where(JobDependency.to_job_id == any_(bindparam("job_ids"))) + return await self._get_upstream_dependencies(job_ids=job_ids, depth=depth) case "DOWNSTREAM": - job_dependency_query = job_dependency_query.where( - JobDependency.from_job_id == any_(bindparam("job_ids")) - ) + return await self._get_downstream_dependencies(job_ids=job_ids, depth=depth) case "BOTH": - job_dependency_query = job_dependency_query.where( - or_( - JobDependency.from_job_id == any_(bindparam("job_ids")), - JobDependency.to_job_id == any_(bindparam("job_ids")), - ) - ) - scalars = await self._session.scalars(job_dependency_query, {"job_ids": job_ids}) - return list(scalars.all()) + result = [] + result.extend(await self._get_upstream_dependencies(job_ids=job_ids, depth=depth)) + result.extend(await self._get_downstream_dependencies(job_ids=job_ids, depth=depth)) + return result + + async def _get_upstream_dependencies(self, job_ids: list[int], depth: int) -> list[JobDependency]: + stmt = select( + upstream_jobs_query_cte.c.from_job_id, + upstream_jobs_query_cte.c.to_job_id, + upstream_jobs_query_cte.c.type, + ) + result = await self._session.execute(stmt, {"job_ids": job_ids, "depth": depth}) + return [ + JobDependency(from_job_id=from_job_id, to_job_id=to_job_id, type=type_) + for from_job_id, to_job_id, type_ in result.fetchall() + ] + + async def _get_downstream_dependencies(self, job_ids: list[int], depth: int) -> list[JobDependency]: + stmt = select( + downstream_jobs_query_cte.c.from_job_id, + downstream_jobs_query_cte.c.to_job_id, + downstream_jobs_query_cte.c.type, + ) + result = await self._session.execute(stmt, {"job_ids": job_ids, "depth": depth}) + return [ + JobDependency(from_job_id=from_job_id, to_job_id=to_job_id, type=type_) + for from_job_id, to_job_id, type_ in result.fetchall() + ] async def _get(self, job_dependency: JobDependencyDTO) -> JobDependency | None: return await self._session.scalar( diff --git a/data_rentgen/server/api/v1/router/job.py b/data_rentgen/server/api/v1/router/job.py index 7528864e..53b7d7b7 100644 --- a/data_rentgen/server/api/v1/router/job.py +++ b/data_rentgen/server/api/v1/router/job.py @@ -91,7 +91,11 @@ async def get_job_dependencies( job_service: Annotated[JobService, Depends()], current_user: Annotated[User, Depends(get_user())], ) -> JobDependenciesResponseV1: - job_dependencies = await job_service.get_job_dependencies(query_args.start_node_id, query_args.direction) + job_dependencies = await job_service.get_job_dependencies( + query_args.start_node_id, + query_args.direction, + query_args.depth, + ) return JobDependenciesResponseV1( relations=JobDependenciesRelationsV1( parents=[ diff --git a/data_rentgen/server/schemas/v1/job.py b/data_rentgen/server/schemas/v1/job.py index 79359400..7ef41963 100644 --- a/data_rentgen/server/schemas/v1/job.py +++ b/data_rentgen/server/schemas/v1/job.py @@ -129,4 +129,5 @@ class JobDependenciesQueryV1(BaseModel): description="Direction of the lineage", examples=["DOWNSTREAM", "UPSTREAM", "BOTH"], ) + depth: int = Field(description="Depth of dependencies between jobs", default=1) model_config = ConfigDict(extra="ignore") diff --git a/data_rentgen/server/services/job.py b/data_rentgen/server/services/job.py index 9d1f5186..af2cf520 100644 --- a/data_rentgen/server/services/job.py +++ b/data_rentgen/server/services/job.py @@ -111,8 +111,15 @@ async def get_job_dependencies( self, start_node_id: int, direction: Literal["UPSTREAM", "DOWNSTREAM", "BOTH"], + depth: int, ) -> JobDependenciesResult: - logger.info("Get Job dependencies with start at job with id %s and direction: %s", start_node_id, direction) + logger.info( + "Get Job dependencies with start at job with id %s and next params: direction: %s, depth: %s", + start_node_id, + direction, + depth, + ) + job_ids = {start_node_id} ancestor_relations = await self._uow.job.list_ancestor_relations([start_node_id]) descendant_relations = await self._uow.job.list_descendant_relations([start_node_id]) @@ -122,10 +129,13 @@ async def get_job_dependencies( | {p.child_job_id for p in descendant_relations} ) - dependencies = await self._uow.job_dependency.get_dependencies(job_ids=list(job_ids), direction=direction) + dependencies = await self._uow.job_dependency.get_dependencies( + job_ids=list(job_ids), + direction=direction, + depth=depth, + ) dependency_job_ids = {d.from_job_id for d in dependencies} | {d.to_job_id for d in dependencies} job_ids |= dependency_job_ids - # return ancestors for all found jobs in the graph ancestor_relations += await self._uow.job.list_ancestor_relations(list(dependency_job_ids)) job_ids |= {p.parent_job_id for p in ancestor_relations} diff --git a/tests/test_server/fixtures/factories/job.py b/tests/test_server/fixtures/factories/job.py index b8ec7500..8f87ab1b 100644 --- a/tests/test_server/fixtures/factories/job.py +++ b/tests/test_server/fixtures/factories/job.py @@ -327,6 +327,51 @@ async def jobs_with_same_parent_job( await clean_db(async_session) +@pytest_asyncio.fixture +async def job_dependency_depth_chain( + async_session_maker: Callable[[], AbstractAsyncContextManager[AsyncSession]], +) -> AsyncGenerator[list[Job], None]: + """ + Linear dependency chain of 5 jobs without: + + job_1 → job_2 → job_3 → job_4 → job_5 + + Each arrow is a JobDependency edge with type "DIRECT_DEPENDENCY". + Used for testing depth-limited dependency queries. + """ + async with async_session_maker() as async_session: + location = await create_location(async_session) + job_type = await create_job_type(async_session) + + jobs = [] + for i in range(1, 6): + job = await create_job( + async_session, + location_id=location.id, + job_type_id=job_type.id, + job_kwargs={"name": f"depth-chain-job-{i}"}, + ) + jobs.append(job) + + async_session.add_all( + [ + JobDependency( + from_job_id=jobs[i].id, + to_job_id=jobs[i + 1].id, + type="DIRECT_DEPENDENCY", + ) + for i in range(len(jobs) - 1) + ], + ) + await async_session.commit() + async_session.expunge_all() + + yield jobs + + async with async_session_maker() as async_session: + await clean_db(async_session) + + @pytest_asyncio.fixture async def job_dependency_chain( async_session_maker: Callable[[], AbstractAsyncContextManager[AsyncSession]], diff --git a/tests/test_server/test_jobs/test_job_dependencies.py b/tests/test_server/test_jobs/test_job_dependencies.py index 115a695c..3e304e3f 100644 --- a/tests/test_server/test_jobs/test_job_dependencies.py +++ b/tests/test_server/test_jobs/test_job_dependencies.py @@ -202,3 +202,63 @@ async def test_get_job_dependencies_with_direction_downstream( }, "nodes": {"jobs": jobs_to_json(expected_nodes)}, } + + +@pytest.mark.parametrize( + ["depth", "direction", "expected_dep_indices", "expected_job_indices"], + [ + (1, "DOWNSTREAM", [(2, 3)], [2, 3]), + (2, "DOWNSTREAM", [(2, 3), (3, 4)], [2, 3, 4]), + (1, "UPSTREAM", [(1, 2)], [1, 2]), + (2, "UPSTREAM", [(0, 1), (1, 2)], [0, 1, 2]), + (1, "BOTH", [(1, 2), (2, 3)], [1, 2, 3]), + (2, "BOTH", [(0, 1), (1, 2), (2, 3), (3, 4)], [0, 1, 2, 3, 4]), + ], + ids=[ + "depth_1-downstream", + "depth_2-downstream", + "depth_1-upstream", + "depth_2-upstream", + "depth_1-both", + "depth_2-both", + ], +) +async def test_get_job_dependencies_with_depth( + test_client: AsyncClient, + job_dependency_depth_chain: tuple[Job, ...], + async_session: AsyncSession, + mocked_user: MockedUser, + depth: int, + direction: str, + expected_dep_indices: list[tuple[int, int]], + expected_job_indices: list[int], +): + """ + Fixture chain: job_0 → job_1 → job_2 → job_3 → job_4 + Start node is always job_2 (middle of the chain). + """ + jobs = job_dependency_depth_chain + start_job = jobs[2] + + expected_jobs = await enrich_jobs([jobs[i] for i in expected_job_indices], async_session) + + response = await test_client.get( + "v1/jobs/dependencies", + headers={"Authorization": f"Bearer {mocked_user.access_token}"}, + params={"start_node_id": start_job.id, "depth": depth, "direction": direction}, + ) + assert response.status_code == HTTPStatus.OK, response.json() + assert response.json() == { + "relations": { + "parents": [], + "dependencies": [ + { + "from": {"kind": "JOB", "id": str(jobs[i].id)}, + "to": {"kind": "JOB", "id": str(jobs[j].id)}, + "type": "DIRECT_DEPENDENCY", + } + for i, j in sorted(expected_dep_indices) + ], + }, + "nodes": {"jobs": jobs_to_json(expected_jobs)}, + } From a7f110c72b1d650fe497ac8b12cbe4c90fa4908e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=AF=D0=BA=D0=B8=D0=BC=D0=B5=D0=BD=D0=BA=D0=BE=D0=B2=20?= =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB=20=D0=90=D0=BB=D0=B5=D0=BA?= =?UTF-8?q?=D1=81=D0=B0=D0=BD=D0=B4=D1=80=D0=BE=D0=B2=D0=B8=D1=87?= Date: Thu, 19 Mar 2026 10:21:29 +0300 Subject: [PATCH 2/8] [DOP-34705] replace columns to model in cte --- .../db/repositories/job_dependency.py | 20 ++++--------------- data_rentgen/server/api/v1/router/job.py | 6 +++--- tests/test_server/fixtures/factories/job.py | 2 +- 3 files changed, 8 insertions(+), 20 deletions(-) diff --git a/data_rentgen/db/repositories/job_dependency.py b/data_rentgen/db/repositories/job_dependency.py index 53620142..db28f725 100644 --- a/data_rentgen/db/repositories/job_dependency.py +++ b/data_rentgen/db/repositories/job_dependency.py @@ -3,7 +3,6 @@ from typing import Literal from sqlalchemy import ARRAY, Integer, any_, bindparam, cast, func, literal, select, tuple_ -from sqlalchemy.orm import aliased from data_rentgen.db.models.job_dependency import JobDependency from data_rentgen.db.repositories.base import Repository @@ -27,14 +26,9 @@ JobDependency.to_job_id == bindparam("to_job_id"), ) -source_jobs = aliased(JobDependency, name="source") -target_jobs = aliased(JobDependency, name="target") - upstream_jobs_query_base_part = ( select( - JobDependency.from_job_id.label("from_job_id"), - JobDependency.to_job_id.label("to_job_id"), - JobDependency.type.label("type"), + JobDependency, literal(1).label("depth"), ) .select_from(JobDependency) @@ -44,9 +38,7 @@ upstream_jobs_query_recursive_part = ( select( - JobDependency.from_job_id.label("from_job_id"), - JobDependency.to_job_id.label("to_job_id"), - JobDependency.type.label("type"), + JobDependency, (upstream_jobs_query_cte.c.depth + 1).label("depth"), ) .select_from(JobDependency) @@ -61,9 +53,7 @@ downstream_jobs_query_base_part = ( select( - JobDependency.from_job_id.label("from_job_id"), - JobDependency.to_job_id.label("to_job_id"), - JobDependency.type.label("type"), + JobDependency, literal(1).label("depth"), ) .select_from(JobDependency) @@ -73,9 +63,7 @@ downstream_jobs_query_recursive_part = ( select( - JobDependency.from_job_id.label("from_job_id"), - JobDependency.to_job_id.label("to_job_id"), - JobDependency.type.label("type"), + JobDependency, (downstream_jobs_query_cte.c.depth + 1).label("depth"), ) .select_from(JobDependency) diff --git a/data_rentgen/server/api/v1/router/job.py b/data_rentgen/server/api/v1/router/job.py index 53b7d7b7..0fc94e86 100644 --- a/data_rentgen/server/api/v1/router/job.py +++ b/data_rentgen/server/api/v1/router/job.py @@ -92,9 +92,9 @@ async def get_job_dependencies( current_user: Annotated[User, Depends(get_user())], ) -> JobDependenciesResponseV1: job_dependencies = await job_service.get_job_dependencies( - query_args.start_node_id, - query_args.direction, - query_args.depth, + start_node_id=query_args.start_node_id, + direction=query_args.direction, + depth=query_args.depth, ) return JobDependenciesResponseV1( relations=JobDependenciesRelationsV1( diff --git a/tests/test_server/fixtures/factories/job.py b/tests/test_server/fixtures/factories/job.py index 8f87ab1b..11adeea2 100644 --- a/tests/test_server/fixtures/factories/job.py +++ b/tests/test_server/fixtures/factories/job.py @@ -332,7 +332,7 @@ async def job_dependency_depth_chain( async_session_maker: Callable[[], AbstractAsyncContextManager[AsyncSession]], ) -> AsyncGenerator[list[Job], None]: """ - Linear dependency chain of 5 jobs without: + Linear dependency chain of 5 jobs: job_1 → job_2 → job_3 → job_4 → job_5 From 0a0b26f142b96b61d0949949ee217f0b7b9d989c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=AF=D0=BA=D0=B8=D0=BC=D0=B5=D0=BD=D0=BA=D0=BE=D0=B2=20?= =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB=20=D0=90=D0=BB=D0=B5=D0=BA?= =?UTF-8?q?=D1=81=D0=B0=D0=BD=D0=B4=D1=80=D0=BE=D0=B2=D0=B8=D1=87?= Date: Thu, 19 Mar 2026 10:41:06 +0300 Subject: [PATCH 3/8] [DOP-34705] replace columns to model in cte --- .../db/repositories/job_dependency.py | 29 ++++++------------- 1 file changed, 9 insertions(+), 20 deletions(-) diff --git a/data_rentgen/db/repositories/job_dependency.py b/data_rentgen/db/repositories/job_dependency.py index db28f725..d3bf5e19 100644 --- a/data_rentgen/db/repositories/job_dependency.py +++ b/data_rentgen/db/repositories/job_dependency.py @@ -3,6 +3,7 @@ from typing import Literal from sqlalchemy import ARRAY, Integer, any_, bindparam, cast, func, literal, select, tuple_ +from sqlalchemy.orm import aliased from data_rentgen.db.models.job_dependency import JobDependency from data_rentgen.db.repositories.base import Repository @@ -50,6 +51,7 @@ upstream_jobs_query_cte = upstream_jobs_query_cte.union_all(upstream_jobs_query_recursive_part) +upstream_entities_query = aliased(JobDependency, upstream_jobs_query_cte) downstream_jobs_query_base_part = ( select( @@ -74,6 +76,7 @@ ) downstream_jobs_query_cte = downstream_jobs_query_cte.union_all(downstream_jobs_query_recursive_part) +downstream_entities_query = aliased(JobDependency, downstream_jobs_query_cte) class JobDependencyRepository(Repository[JobDependency]): @@ -124,28 +127,14 @@ async def get_dependencies( return result async def _get_upstream_dependencies(self, job_ids: list[int], depth: int) -> list[JobDependency]: - stmt = select( - upstream_jobs_query_cte.c.from_job_id, - upstream_jobs_query_cte.c.to_job_id, - upstream_jobs_query_cte.c.type, - ) - result = await self._session.execute(stmt, {"job_ids": job_ids, "depth": depth}) - return [ - JobDependency(from_job_id=from_job_id, to_job_id=to_job_id, type=type_) - for from_job_id, to_job_id, type_ in result.fetchall() - ] + stmt = select(upstream_entities_query) + result = await self._session.scalars(stmt, {"job_ids": job_ids, "depth": depth}) + return list(result.all()) async def _get_downstream_dependencies(self, job_ids: list[int], depth: int) -> list[JobDependency]: - stmt = select( - downstream_jobs_query_cte.c.from_job_id, - downstream_jobs_query_cte.c.to_job_id, - downstream_jobs_query_cte.c.type, - ) - result = await self._session.execute(stmt, {"job_ids": job_ids, "depth": depth}) - return [ - JobDependency(from_job_id=from_job_id, to_job_id=to_job_id, type=type_) - for from_job_id, to_job_id, type_ in result.fetchall() - ] + stmt = select(downstream_entities_query) + result = await self._session.scalars(stmt, {"job_ids": job_ids, "depth": depth}) + return list(result.all()) async def _get(self, job_dependency: JobDependencyDTO) -> JobDependency | None: return await self._session.scalar( From 634784d406b469a26efe4ceea371d6c763862cec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=AF=D0=BA=D0=B8=D0=BC=D0=B5=D0=BD=D0=BA=D0=BE=D0=B2=20?= =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB=20=D0=90=D0=BB=D0=B5=D0=BA?= =?UTF-8?q?=D1=81=D0=B0=D0=BD=D0=B4=D1=80=D0=BE=D0=B2=D0=B8=D1=87?= Date: Thu, 19 Mar 2026 11:32:33 +0300 Subject: [PATCH 4/8] [DOP-34705] add tests for edge cases --- .../test_jobs/test_job_dependencies.py | 42 +++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/tests/test_server/test_jobs/test_job_dependencies.py b/tests/test_server/test_jobs/test_job_dependencies.py index 3e304e3f..17aabaf3 100644 --- a/tests/test_server/test_jobs/test_job_dependencies.py +++ b/tests/test_server/test_jobs/test_job_dependencies.py @@ -213,6 +213,7 @@ async def test_get_job_dependencies_with_direction_downstream( (2, "UPSTREAM", [(0, 1), (1, 2)], [0, 1, 2]), (1, "BOTH", [(1, 2), (2, 3)], [1, 2, 3]), (2, "BOTH", [(0, 1), (1, 2), (2, 3), (3, 4)], [0, 1, 2, 3, 4]), + (5, "BOTH", [(0, 1), (1, 2), (2, 3), (3, 4)], [0, 1, 2, 3, 4]), ], ids=[ "depth_1-downstream", @@ -221,6 +222,7 @@ async def test_get_job_dependencies_with_direction_downstream( "depth_2-upstream", "depth_1-both", "depth_2-both", + "depth_5-both", ], ) async def test_get_job_dependencies_with_depth( @@ -262,3 +264,43 @@ async def test_get_job_dependencies_with_depth( }, "nodes": {"jobs": jobs_to_json(expected_jobs)}, } + + +@pytest.mark.parametrize( + ["direction", "start_node_index"], + [ + ("UPSTREAM", 0), + ("DOWNSTREAM", 4), + ], + ids=["upstream_boundary", "downstream_boundary"], +) +async def test_get_job_dependencies_with_depth_on_boundary( + test_client: AsyncClient, + job_dependency_depth_chain: tuple[Job, ...], + async_session: AsyncSession, + mocked_user: MockedUser, + direction: str, + start_node_index: int, +): + """ + Fixture chain: job_0 → job_1 → job_2 → job_3 → job_4 + Start node is job_0 or job_4. + """ + jobs = job_dependency_depth_chain + start_job = jobs[start_node_index] + + [expected_job] = await enrich_jobs([start_job], async_session) + + response = await test_client.get( + "v1/jobs/dependencies", + headers={"Authorization": f"Bearer {mocked_user.access_token}"}, + params={"start_node_id": start_job.id, "depth": 2, "direction": direction}, + ) + assert response.status_code == HTTPStatus.OK, response.json() + assert response.json() == { + "relations": { + "parents": [], + "dependencies": [], + }, + "nodes": {"jobs": jobs_to_json([expected_job])}, + } From a7d41a754619648603b2c0097c4864636a24f5f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=AF=D0=BA=D0=B8=D0=BC=D0=B5=D0=BD=D0=BA=D0=BE=D0=B2=20?= =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB=20=D0=90=D0=BB=D0=B5=D0=BA?= =?UTF-8?q?=D1=81=D0=B0=D0=BD=D0=B4=D1=80=D0=BE=D0=B2=D0=B8=D1=87?= Date: Thu, 19 Mar 2026 11:49:58 +0300 Subject: [PATCH 5/8] [DOP-34705] rebase fix --- data_rentgen/db/repositories/job_dependency.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/data_rentgen/db/repositories/job_dependency.py b/data_rentgen/db/repositories/job_dependency.py index d3bf5e19..83fd0904 100644 --- a/data_rentgen/db/repositories/job_dependency.py +++ b/data_rentgen/db/repositories/job_dependency.py @@ -51,7 +51,7 @@ upstream_jobs_query_cte = upstream_jobs_query_cte.union_all(upstream_jobs_query_recursive_part) -upstream_entities_query = aliased(JobDependency, upstream_jobs_query_cte) +upstream_entities_query = select(aliased(JobDependency, upstream_jobs_query_cte)) downstream_jobs_query_base_part = ( select( @@ -76,7 +76,7 @@ ) downstream_jobs_query_cte = downstream_jobs_query_cte.union_all(downstream_jobs_query_recursive_part) -downstream_entities_query = aliased(JobDependency, downstream_jobs_query_cte) +downstream_entities_query = select(aliased(JobDependency, downstream_jobs_query_cte)) class JobDependencyRepository(Repository[JobDependency]): @@ -127,13 +127,11 @@ async def get_dependencies( return result async def _get_upstream_dependencies(self, job_ids: list[int], depth: int) -> list[JobDependency]: - stmt = select(upstream_entities_query) - result = await self._session.scalars(stmt, {"job_ids": job_ids, "depth": depth}) + result = await self._session.scalars(upstream_entities_query, {"job_ids": job_ids, "depth": depth}) return list(result.all()) async def _get_downstream_dependencies(self, job_ids: list[int], depth: int) -> list[JobDependency]: - stmt = select(downstream_entities_query) - result = await self._session.scalars(stmt, {"job_ids": job_ids, "depth": depth}) + result = await self._session.scalars(downstream_entities_query, {"job_ids": job_ids, "depth": depth}) return list(result.all()) async def _get(self, job_dependency: JobDependencyDTO) -> JobDependency | None: From 4091ae29c59c0dbaa4f7243dfa17d6e78d4fb0a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=AF=D0=BA=D0=B8=D0=BC=D0=B5=D0=BD=D0=BA=D0=BE=D0=B2=20?= =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB=20=D0=90=D0=BB=D0=B5=D0=BA?= =?UTF-8?q?=D1=81=D0=B0=D0=BD=D0=B4=D1=80=D0=BE=D0=B2=D0=B8=D1=87?= Date: Thu, 19 Mar 2026 11:55:00 +0300 Subject: [PATCH 6/8] [DOP-34705] add changelog --- docs/changelog/next_release/412.improvement.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 docs/changelog/next_release/412.improvement.rst diff --git a/docs/changelog/next_release/412.improvement.rst b/docs/changelog/next_release/412.improvement.rst new file mode 100644 index 00000000..36b68974 --- /dev/null +++ b/docs/changelog/next_release/412.improvement.rst @@ -0,0 +1 @@ +Add ``depth`` query parameter to ``GET /v1/jobs/dependencies`` endpoint, allowing control over how many layers of dependency are traversed. Defaults to ``1``. From 39505fb63865be06d1e5231b5937ea093629d6f2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=AF=D0=BA=D0=B8=D0=BC=D0=B5=D0=BD=D0=BA=D0=BE=D0=B2=20?= =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB=20=D0=90=D0=BB=D0=B5=D0=BA?= =?UTF-8?q?=D1=81=D0=B0=D0=BD=D0=B4=D1=80=D0=BE=D0=B2=D0=B8=D1=87?= Date: Thu, 19 Mar 2026 12:10:34 +0300 Subject: [PATCH 7/8] [DOP-34705] combine queries for direction both --- .../db/repositories/job_dependency.py | 21 +++++++------------ 1 file changed, 7 insertions(+), 14 deletions(-) diff --git a/data_rentgen/db/repositories/job_dependency.py b/data_rentgen/db/repositories/job_dependency.py index 83fd0904..0648e5ad 100644 --- a/data_rentgen/db/repositories/job_dependency.py +++ b/data_rentgen/db/repositories/job_dependency.py @@ -117,22 +117,15 @@ async def get_dependencies( match direction: case "UPSTREAM": - return await self._get_upstream_dependencies(job_ids=job_ids, depth=depth) + result = await self._session.scalars(upstream_entities_query, {"job_ids": job_ids, "depth": depth}) + return list(result.all()) case "DOWNSTREAM": - return await self._get_downstream_dependencies(job_ids=job_ids, depth=depth) + result = await self._session.scalars(downstream_entities_query, {"job_ids": job_ids, "depth": depth}) + return list(result.all()) case "BOTH": - result = [] - result.extend(await self._get_upstream_dependencies(job_ids=job_ids, depth=depth)) - result.extend(await self._get_downstream_dependencies(job_ids=job_ids, depth=depth)) - return result - - async def _get_upstream_dependencies(self, job_ids: list[int], depth: int) -> list[JobDependency]: - result = await self._session.scalars(upstream_entities_query, {"job_ids": job_ids, "depth": depth}) - return list(result.all()) - - async def _get_downstream_dependencies(self, job_ids: list[int], depth: int) -> list[JobDependency]: - result = await self._session.scalars(downstream_entities_query, {"job_ids": job_ids, "depth": depth}) - return list(result.all()) + query = select(aliased(JobDependency, (upstream_entities_query.union(downstream_entities_query)).cte())) + result = await self._session.scalars(query, {"job_ids": job_ids, "depth": depth}) + return list(result.all()) async def _get(self, job_dependency: JobDependencyDTO) -> JobDependency | None: return await self._session.scalar( From 168da5a997136be8e96d4afdd5a279343d94e234 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=AF=D0=BA=D0=B8=D0=BC=D0=B5=D0=BD=D0=BA=D0=BE=D0=B2=20?= =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB=20=D0=90=D0=BB=D0=B5=D0=BA?= =?UTF-8?q?=D1=81=D0=B0=D0=BD=D0=B4=D1=80=D0=BE=D0=B2=D0=B8=D1=87?= Date: Thu, 19 Mar 2026 12:19:33 +0300 Subject: [PATCH 8/8] [DOP-34705] move query for both into header --- .../db/repositories/job_dependency.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/data_rentgen/db/repositories/job_dependency.py b/data_rentgen/db/repositories/job_dependency.py index 0648e5ad..7606d9c1 100644 --- a/data_rentgen/db/repositories/job_dependency.py +++ b/data_rentgen/db/repositories/job_dependency.py @@ -50,7 +50,7 @@ ) -upstream_jobs_query_cte = upstream_jobs_query_cte.union_all(upstream_jobs_query_recursive_part) +upstream_jobs_query_cte = upstream_jobs_query_cte.union(upstream_jobs_query_recursive_part) upstream_entities_query = select(aliased(JobDependency, upstream_jobs_query_cte)) downstream_jobs_query_base_part = ( @@ -75,9 +75,11 @@ ) ) -downstream_jobs_query_cte = downstream_jobs_query_cte.union_all(downstream_jobs_query_recursive_part) +downstream_jobs_query_cte = downstream_jobs_query_cte.union(downstream_jobs_query_recursive_part) downstream_entities_query = select(aliased(JobDependency, downstream_jobs_query_cte)) +both_entities_query = select(aliased(JobDependency, (upstream_entities_query.union(downstream_entities_query)).cte())) + class JobDependencyRepository(Repository[JobDependency]): async def fetch_bulk( @@ -117,15 +119,14 @@ async def get_dependencies( match direction: case "UPSTREAM": - result = await self._session.scalars(upstream_entities_query, {"job_ids": job_ids, "depth": depth}) - return list(result.all()) + query = upstream_entities_query case "DOWNSTREAM": - result = await self._session.scalars(downstream_entities_query, {"job_ids": job_ids, "depth": depth}) - return list(result.all()) + query = downstream_entities_query case "BOTH": - query = select(aliased(JobDependency, (upstream_entities_query.union(downstream_entities_query)).cte())) - result = await self._session.scalars(query, {"job_ids": job_ids, "depth": depth}) - return list(result.all()) + query = both_entities_query + + result = await self._session.scalars(query, {"job_ids": job_ids, "depth": depth}) + return list(result.all()) async def _get(self, job_dependency: JobDependencyDTO) -> JobDependency | None: return await self._session.scalar(