Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions packit_service/events/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def _add_to_mapping(kls: type["Event"]):

class Event(ABC):
task_accepted_time: Optional[datetime] = None
cancel_cutoff_time: Optional[datetime] = None
actor: Optional[str]

def __init__(self, created_at: Optional[Union[int, float, str]] = None):
Expand Down Expand Up @@ -139,6 +140,8 @@ def get_dict(self, default_dict: Optional[dict] = None) -> dict:
d["task_accepted_time"] = (
int(task_accepted_time.timestamp()) if task_accepted_time else None
)
cancel_cutoff_time = d.get("cancel_cutoff_time")
d["cancel_cutoff_time"] = cancel_cutoff_time.timestamp() if cancel_cutoff_time else None
d["project_url"] = d.get("project_url") or (
self.db_project_object.project.project_url
if (
Expand Down
9 changes: 9 additions & 0 deletions packit_service/events/event_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def __init__(
event_dict: Optional[dict],
issue_id: Optional[int],
task_accepted_time: Optional[datetime],
cancel_cutoff_time: Optional[datetime],
build_targets_override: Optional[set[tuple[str, str]]],
tests_targets_override: Optional[set[tuple[str, str]]],
branches_override: Optional[list[str]],
Expand All @@ -58,6 +59,7 @@ def __init__(
self.event_dict = event_dict
self.issue_id = issue_id
self.task_accepted_time = task_accepted_time
self.cancel_cutoff_time = cancel_cutoff_time
self.build_targets_override = (
set(build_targets_override) if build_targets_override else None
)
Expand Down Expand Up @@ -91,6 +93,9 @@ def from_event_dict(cls, event: dict):
time = event.get("task_accepted_time")
task_accepted_time = datetime.fromtimestamp(time, timezone.utc) if time else None

cutoff = event.get("cancel_cutoff_time")
cancel_cutoff_time = datetime.fromtimestamp(cutoff, timezone.utc) if cutoff else None

build_targets_override = (
{(target, identifier_) for [target, identifier_] in event.get("build_targets_override")}
if event.get("build_targets_override")
Expand Down Expand Up @@ -118,6 +123,7 @@ def from_event_dict(cls, event: dict):
event_dict=event,
issue_id=issue_id,
task_accepted_time=task_accepted_time,
cancel_cutoff_time=cancel_cutoff_time,
build_targets_override=build_targets_override,
tests_targets_override=tests_targets_override,
branches_override=branches_override,
Expand All @@ -138,6 +144,7 @@ def to_event(self) -> "Event":
kwargs.pop("event_type", None)
kwargs.pop("event_id", None)
kwargs.pop("task_accepted_time", None)
kwargs.pop("cancel_cutoff_time", None)
kwargs.pop("build_targets_override", None)
kwargs.pop("tests_targets_override", None)
kwargs.pop("branches_override", None)
Expand Down Expand Up @@ -321,6 +328,8 @@ def get_dict(self) -> dict:
d["task_accepted_time"] = (
int(task_accepted_time.timestamp()) if task_accepted_time else None
)
cancel_cutoff_time = d.get("cancel_cutoff_time")
d["cancel_cutoff_time"] = cancel_cutoff_time.timestamp() if cancel_cutoff_time else None
if self.build_targets_override:
d["build_targets_override"] = list(self.build_targets_override)
if self.tests_targets_override:
Expand Down
107 changes: 87 additions & 20 deletions packit_service/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
Data layer on top of PSQL using sqlalch
"""

import datetime as dt
import enum
import logging
import re
Expand Down Expand Up @@ -1943,6 +1944,25 @@ def create(
def get_project_event_object(self) -> AbstractProjectObjectDbType:
return self.project_event.get_project_event_object()

@classmethod
def get_latest_datetime_for_event(
cls,
project_event_type: ProjectEventModelType,
event_id: int,
) -> Optional[dt.datetime]:
"""Return the datetime of the most recent pipeline for the given
project object (e.g. a PR or branch), or None if no pipelines exist."""
with sa_session_transaction() as session:
return (
session.query(func.max(PipelineModel.datetime))
.join(ProjectEventModel)
.filter(
ProjectEventModel.type == project_event_type,
ProjectEventModel.event_id == event_id,
)
.scalar()
)

def __repr__(self):
return (
f"PipelineModel(id={self.id}, datetime='{datetime}', "
Expand Down Expand Up @@ -2175,30 +2195,43 @@ def get_by_id(cls, group_id: int) -> Optional["CoprBuildGroupModel"]:
return session.query(CoprBuildGroupModel).filter_by(id=group_id).first()

@classmethod
def get_running(cls, commit_sha: str) -> Iterable["CoprBuildTargetModel"]:
"""Get list of currently running Copr builds matching the passed
arguments.
def get_running(
cls,
project_event_type: ProjectEventModelType,
event_id: int,
created_before: Optional[datetime] = None,
) -> Iterable["CoprBuildTargetModel"]:
"""Get list of currently running Copr builds for a given project object
(e.g. a PR or branch).

Args:
commit_sha: Commit hash that is used for filtering the running jobs.
project_event_type: Type of the project event (e.g. pull_request).
event_id: ID of the project object (e.g. PullRequestModel.id).
created_before: If set, only return builds whose pipeline was
created at or before this datetime (used to avoid cancelling
builds from the current trigger batch).

Returns:
An iterable over Copr target models that are curently in queue
An iterable over Copr target models that are currently in queue
(running) or waiting for an SRPM.
"""
with sa_session_transaction() as session:
return (
q = (
session.query(CoprBuildTargetModel)
.join(CoprBuildGroupModel)
.join(PipelineModel)
.join(ProjectEventModel)
.filter(
ProjectEventModel.commit_sha == commit_sha,
ProjectEventModel.type == project_event_type,
ProjectEventModel.event_id == event_id,
CoprBuildTargetModel.status.in_(
(BuildStatus.pending, BuildStatus.waiting_for_srpm)
),
)
)
if created_before is not None:
q = q.filter(PipelineModel.datetime <= created_before)
return q


class BuildStatus(str, enum.Enum):
Expand Down Expand Up @@ -2629,18 +2662,22 @@ def get_running(
cls,
project_event_type: ProjectEventModelType,
event_id: int,
created_before: Optional[datetime] = None,
) -> Iterable["KojiBuildTargetModel"]:
"""Get running Koji builds for a given project object (e.g. a PR or branch).

Args:
project_event_type: Type of the project event (e.g. pull_request).
event_id: ID of the project object (e.g. PullRequestModel.id).
created_before: If set, only return builds whose pipeline was
created at or before this datetime (used to avoid cancelling
builds from the current trigger batch).

Returns:
An iterable over KojiBuildTargetModels in non-final states.
"""
with sa_session_transaction() as session:
return (
q = (
session.query(KojiBuildTargetModel)
.join(KojiBuildGroupModel)
.join(PipelineModel)
Expand All @@ -2651,6 +2688,9 @@ def get_running(
KojiBuildTargetModel.status.in_(("pending", "queued", "running")),
)
)
if created_before is not None:
q = q.filter(PipelineModel.datetime <= created_before)
return q


class BodhiUpdateTargetModel(GroupAndTargetModelConnector, Base):
Expand Down Expand Up @@ -3620,26 +3660,37 @@ def get_by_id(cls, group_id: int) -> Optional["TFTTestRunGroupModel"]:
return session.query(TFTTestRunGroupModel).filter_by(id=group_id).first()

@classmethod
def get_running(cls, commit_sha: str, ranch: str) -> Iterable["TFTTestRunTargetModel"]:
"""Get list of currently running Testing Farm runs matching the passed
arguments.
def get_running(
cls,
project_event_type: ProjectEventModelType,
event_id: int,
ranch: str,
created_before: Optional[datetime] = None,
) -> Iterable["TFTTestRunTargetModel"]:
"""Get list of currently running Testing Farm runs for a given project
object (e.g. a PR or branch).

Args:
commit_sha: Commit hash that is used for filtering the running jobs.
project_event_type: Type of the project event (e.g. pull_request).
event_id: ID of the project object (e.g. PullRequestModel.id).
ranch: Testing Farm ranch where the tests are supposed to be run.
created_before: If set, only return test runs whose pipeline was
created at or before this datetime (used to avoid cancelling
tests from the current trigger batch).

Returns:
An iterable over TFT target models that reprepresent matching TF
runs that are _queued_ (already submitted to the TF) or _running_.
"""
with sa_session_transaction() as session:
return (
q = (
session.query(TFTTestRunTargetModel)
.join(TFTTestRunGroupModel)
.join(PipelineModel)
.join(ProjectEventModel)
.filter(
ProjectEventModel.commit_sha == commit_sha,
ProjectEventModel.type == project_event_type,
ProjectEventModel.event_id == event_id,
TFTTestRunGroupModel.ranch == ranch,
TFTTestRunTargetModel.status.in_(
(
Expand All @@ -3649,6 +3700,9 @@ def get_running(cls, commit_sha: str, ranch: str) -> Iterable["TFTTestRunTargetM
),
)
)
if created_before is not None:
q = q.filter(PipelineModel.datetime <= created_before)
return q


class TFTTestRunTargetModel(GroupAndTargetModelConnector, Base):
Expand Down Expand Up @@ -4953,28 +5007,41 @@ def get_by_id(cls, group_id: int) -> Optional["LogDetectiveRunGroupModel"]:
return session.query(LogDetectiveRunGroupModel).filter_by(id=group_id).first()

@classmethod
def get_running(cls, commit_sha: str) -> Iterable[LogDetectiveRunModel]:
"""Get list of currently running Log Detective runs matching the passed
arguments.
def get_running(
cls,
project_event_type: ProjectEventModelType,
event_id: int,
created_before: Optional[datetime] = None,
) -> Iterable[LogDetectiveRunModel]:
"""Get list of currently running Log Detective runs for a given project
object (e.g. a PR or branch).

Args:
commit_sha: Commit hash that is used for filtering the running jobs.
project_event_type: Type of the project event (e.g. pull_request).
event_id: ID of the project object (e.g. PullRequestModel.id).
created_before: If set, only return runs whose pipeline was
created at or before this datetime (used to avoid cancelling
runs from the current trigger batch).

Returns:
An iterable over Log Detective run models representing Log Detective runs
runs that are running.
"""
with sa_session_transaction() as session:
return (
q = (
session.query(LogDetectiveRunModel)
.join(LogDetectiveRunGroupModel)
.join(PipelineModel)
.join(ProjectEventModel)
.filter(
ProjectEventModel.commit_sha == commit_sha,
ProjectEventModel.type == project_event_type,
ProjectEventModel.event_id == event_id,
LogDetectiveRunModel.status == LogDetectiveResult.running,
)
)
if created_before is not None:
q = q.filter(PipelineModel.datetime <= created_before)
return q


@cached(cache=TTLCache(maxsize=2048, ttl=(60 * 60 * 24)))
Expand Down
2 changes: 1 addition & 1 deletion packit_service/worker/handlers/bodhi.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ def _get_or_create_bodhi_update_group_model(self) -> BodhiUpdateGroupModel:
group = None
for koji_build_data in self:
koji_build_target = KojiBuildTargetModel.get_by_task_id(
koji_build_data.task_id,
task_id=koji_build_data.task_id,
)
if koji_build_target:
run_model = koji_build_target.group_of_targets.runs[-1]
Expand Down
10 changes: 10 additions & 0 deletions packit_service/worker/handlers/copr.py
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,16 @@ def handle_testing_farm(self):

event_dict = self.data.get_dict()

if (
self.build
and self.build.group_of_targets.runs
and (cutoff := self.build.group_of_targets.runs[0].datetime)
):
# use the pipeline datetime of the current build rather than
# recomputing via `get_latest_datetime_for_event()`, to avoid
# canceling tests triggered by sibling builds from the same batch
event_dict["cancel_cutoff_time"] = cutoff.timestamp()

for job_config in self.copr_build_helper.job_tests_all:
if (
not job_config.skip_build
Expand Down
18 changes: 14 additions & 4 deletions packit_service/worker/helpers/build/copr_build.py
Original file line number Diff line number Diff line change
Expand Up @@ -1024,10 +1024,20 @@ def get_configured_targets(self) -> set[str]:
def get_running_jobs(
self,
) -> Union[Iterable["CoprBuildTargetModel"], Iterable["TFTTestRunTargetModel"]]:
if sha := self.metadata.commit_sha_before:
yield from CoprBuildGroupModel.get_running(commit_sha=sha)

# [SAFETY] When there's no previous commit hash, yields nothing
if not self.db_project_event:
logger.warning("No db_project_event, cannot query running Copr builds.")
return
if not self.metadata.cancel_cutoff_time:
logger.warning(
"No cancel_cutoff_time, skipping running Copr builds query "
"to avoid canceling unrelated jobs."
)
return
yield from CoprBuildGroupModel.get_running(
project_event_type=self.db_project_event.type,
event_id=self.db_project_event.event_id,
created_before=self.metadata.cancel_cutoff_time,
)

def cancel_running_builds(self):
running_builds = list(self.get_running_jobs())
Expand Down
12 changes: 11 additions & 1 deletion packit_service/worker/helpers/build/koji_build.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,9 +229,19 @@ def run_build(
return get_koji_task_id_and_url_from_stdout(out)

def get_running_jobs(self) -> Iterable[KojiBuildTargetModel]:
return KojiBuildGroupModel.get_running(
if not self.db_project_event:
logger.warning("No db_project_event, cannot query running Koji builds.")
return
if not self.metadata.cancel_cutoff_time:
logger.warning(
"No cancel_cutoff_time, skipping running Koji builds query "
"to avoid canceling unrelated jobs."
)
return
yield from KojiBuildGroupModel.get_running(
project_event_type=self.db_project_event.type,
event_id=self.db_project_event.event_id,
created_before=self.metadata.cancel_cutoff_time,
)

def cancel_running_builds(
Expand Down
19 changes: 14 additions & 5 deletions packit_service/worker/helpers/testing_farm.py
Original file line number Diff line number Diff line change
Expand Up @@ -1234,12 +1234,21 @@ def report_status_to_configured_job(
)

def get_running_jobs(self) -> Iterable["TFTTestRunTargetModel"]:
if sha := self.metadata.commit_sha_before:
yield from TFTTestRunGroupModel.get_running(
commit_sha=sha, ranch=self.tft_client.default_ranch
if not self.db_project_event:
logger.warning("No db_project_event, cannot query running TF runs.")
return
if not self.metadata.cancel_cutoff_time:
logger.warning(
"No cancel_cutoff_time, skipping running TF runs query "
"to avoid canceling unrelated jobs."
)

# [SAFETY] When there's no previous commit hash, yields nothing
return
yield from TFTTestRunGroupModel.get_running(
project_event_type=self.db_project_event.type,
event_id=self.db_project_event.event_id,
ranch=self.tft_client.default_ranch,
created_before=self.metadata.cancel_cutoff_time,
)

def cancel_running_tests(self):
running_tests = list(self.get_running_jobs())
Expand Down
Loading
Loading