From 0714c60d7adf28fd8d772ecf242f9c2c6c84bee0 Mon Sep 17 00:00:00 2001 From: Enkidu93 Date: Wed, 13 May 2026 14:29:13 -0400 Subject: [PATCH 1/9] Enqueue jobs in BuildJobRunnerManager not BuildService --- .../IServalConfiguratorExtensions.cs | 6 + .../src/Serval.Machine.Shared/Models/Build.cs | 4 +- .../Services/BuildJobRunnerManager.cs | 165 ++++++++++++++++++ .../Services/BuildJobService.cs | 128 +++++--------- .../Services/ClearMLBuildJobRunner.cs | 9 - .../Services/ClearMLMonitorService.cs | 16 +- .../Services/IBuildJobRunner.cs | 1 - .../Services/IBuildJobService.cs | 3 - .../Services/TranslationBuildJobService.cs | 24 +-- .../Services/ClearMLMonitorServiceTests.cs | 10 +- .../Services/NmtEngineServiceTests.cs | 28 +-- .../Services/PreprocessBuildJobTests.cs | 36 ++-- .../Services/SmtTransferEngineServiceTests.cs | 16 +- .../Services/StatisticalEngineServiceTests.cs | 16 +- 14 files changed, 295 insertions(+), 167 deletions(-) create mode 100644 src/Machine/src/Serval.Machine.Shared/Services/BuildJobRunnerManager.cs diff --git a/src/Machine/src/Serval.Machine.Shared/Configuration/IServalConfiguratorExtensions.cs b/src/Machine/src/Serval.Machine.Shared/Configuration/IServalConfiguratorExtensions.cs index 16846d559..e373e7f3b 100644 --- a/src/Machine/src/Serval.Machine.Shared/Configuration/IServalConfiguratorExtensions.cs +++ b/src/Machine/src/Serval.Machine.Shared/Configuration/IServalConfiguratorExtensions.cs @@ -66,6 +66,9 @@ private static IServalConfigurator AddTranslationEngines(this IServalConfigurato configurator.AddTranslationEngine(EngineType.Nmt.ToString()); configurator.JobQueues.Add(BuildJobQueues.Nmt); + configurator.Services.AddSingleton>(); + configurator.Services.AddHostedService(p => p.GetRequiredService>()); + return configurator; } @@ -85,6 +88,9 @@ private static IServalConfigurator AddWordAlignmentEngines(this IServalConfigura configurator.Services.AddHostedService(); configurator.JobQueues.Add(BuildJobQueues.Statistical); + configurator.Services.AddSingleton>(); + configurator.Services.AddHostedService(p => p.GetRequiredService>()); + return configurator; } diff --git a/src/Machine/src/Serval.Machine.Shared/Models/Build.cs b/src/Machine/src/Serval.Machine.Shared/Models/Build.cs index 38a3bc739..c76bc5da2 100644 --- a/src/Machine/src/Serval.Machine.Shared/Models/Build.cs +++ b/src/Machine/src/Serval.Machine.Shared/Models/Build.cs @@ -6,6 +6,7 @@ public enum BuildJobState Pending, Active, Canceling, + Deleting, } public enum BuildJobRunnerType @@ -25,9 +26,10 @@ public record Build { public required string BuildId { get; init; } public required BuildJobState JobState { get; init; } - public required string JobId { get; init; } + public string? JobId { get; init; } public required BuildJobRunnerType BuildJobRunner { get; init; } public required BuildStage Stage { get; init; } public string? Options { get; set; } + public object? Data { get; init; } public required BuildExecutionData ExecutionData { get; init; } } diff --git a/src/Machine/src/Serval.Machine.Shared/Services/BuildJobRunnerManager.cs b/src/Machine/src/Serval.Machine.Shared/Services/BuildJobRunnerManager.cs new file mode 100644 index 000000000..8e3bf4a09 --- /dev/null +++ b/src/Machine/src/Serval.Machine.Shared/Services/BuildJobRunnerManager.cs @@ -0,0 +1,165 @@ +namespace Serval.Machine.Shared.Services; + +public class BuildJobRunnerManager(IServiceProvider services, ILogger logger) + : RecurrentTask("Build job runner manager", services, RefreshPeriod, logger) + where TEngine : ITrainingEngine +{ + private static readonly TimeSpan RefreshPeriod = TimeSpan.FromSeconds(5); + + protected override async Task DoWorkAsync(IServiceScope scope, CancellationToken cancellationToken) + { + var logger = scope.ServiceProvider.GetRequiredService>>(); + var dataAccessContext = scope.ServiceProvider.GetRequiredService(); + var platformService = scope.ServiceProvider.GetRequiredService(); + var runners = scope + .ServiceProvider.GetRequiredService>() + .ToDictionary(r => r.Type); + var engines = scope.ServiceProvider.GetRequiredService>(); + + await DispatchPendingBuildsAsync( + engines, + runners, + logger, + dataAccessContext, + platformService, + cancellationToken + ); + await StopCancelingBuildsAsync(engines, runners, logger, cancellationToken); + await DeleteDeletingEngines(engines, runners, logger, cancellationToken); + } + + private static async Task DispatchPendingBuildsAsync( + IRepository engines, + IReadOnlyDictionary runners, + ILogger> logger, + IDataAccessContext dataAccessContext, + IPlatformService platformService, + CancellationToken cancellationToken + ) + { + foreach ( + TEngine engine in await engines.GetAllAsync( + e => e.CurrentBuild != null && e.CurrentBuild.JobState == BuildJobState.Pending, + cancellationToken + ) + ) + { + Build build = engine.CurrentBuild!; + if (!string.IsNullOrEmpty(build.JobId)) + // TODO - should these be cleaned up? + continue; + + if (!runners.TryGetValue(build.BuildJobRunner, out IBuildJobRunner? runner) || runner is null) + { + logger.LogWarning( + "No runner found for build {BuildId} on engine {EngineId}.", + build.BuildId, + engine.EngineId + ); + continue; + } + + string? jobId = null; + try + { + jobId = await runner.CreateJobAsync( + engine.Type, + engine.EngineId, + build.BuildId, + build.Stage, + build.Data, + build.Options, + cancellationToken + ); + await runner.EnqueueJobAsync(jobId, engine.Type, cancellationToken); + } + catch (Exception e) + { + logger.LogError(e, "Failed to dispatch build job for build {BuildId}.", build.BuildId); + await dataAccessContext.WithTransactionAsync( + async (ct) => + { + await platformService.BuildFaultedAsync(build.BuildId, e.Message, CancellationToken.None); + await engines.UpdateAsync( + e => + e.EngineId == engine.Id + && e.CurrentBuild != null + && e.CurrentBuild.BuildId == build.BuildId, + u => + { + u.Unset(e => e.CurrentBuild); + }, + cancellationToken: cancellationToken + ); + if (jobId != null) + await runner.DeleteJobAsync(jobId, CancellationToken.None); + }, + cancellationToken: CancellationToken.None + ); + } + } + } + + private static async Task StopCancelingBuildsAsync( + IRepository engines, + IReadOnlyDictionary runners, + ILogger> logger, + CancellationToken cancellationToken + ) + { + foreach ( + TEngine engine in await engines.GetAllAsync( + e => e.CurrentBuild != null && e.CurrentBuild.JobState == BuildJobState.Canceling, + cancellationToken + ) + ) + { + Build build = engine.CurrentBuild!; + if (string.IsNullOrEmpty(build.JobId)) + continue; + + try + { + await runners[build.BuildJobRunner].StopJobAsync(build.JobId, cancellationToken); + } + catch (Exception e) + { + logger.LogError( + e, + "Failed to stop job {JobId} for canceling build {BuildId}.", + build.JobId, + build.BuildId + ); + } + } + } + + private static async Task DeleteDeletingEngines( + IRepository engines, + IReadOnlyDictionary runners, + ILogger> logger, + CancellationToken cancellationToken + ) + { + foreach ( + TEngine engine in await engines.GetAllAsync( + e => e.CurrentBuild != null && e.CurrentBuild.JobState == BuildJobState.Canceling, + cancellationToken + ) + ) + { + foreach (BuildJobRunnerType runnerType in runners.Keys) + { + IBuildJobRunner runner = runners[runnerType]; + try + { + await runner.DeleteEngineAsync(engine.Id, cancellationToken); + } + catch (Exception e) + { + logger.LogError(e, "Failed to delete engine {EngineId}.", engine.EngineId); + } + } + } + } +} diff --git a/src/Machine/src/Serval.Machine.Shared/Services/BuildJobService.cs b/src/Machine/src/Serval.Machine.Shared/Services/BuildJobService.cs index b02b5aeb7..2f8774aa1 100644 --- a/src/Machine/src/Serval.Machine.Shared/Services/BuildJobService.cs +++ b/src/Machine/src/Serval.Machine.Shared/Services/BuildJobService.cs @@ -1,10 +1,8 @@ namespace Serval.Machine.Shared.Services; -public class BuildJobService(IEnumerable runners, IRepository engines) - : IBuildJobService +public class BuildJobService(IRepository engines) : IBuildJobService where TEngine : ITrainingEngine { - protected readonly Dictionary Runners = runners.ToDictionary(r => r.Type); protected readonly IRepository Engines = engines; public Task IsEngineBuilding(string engineId, CancellationToken cancellationToken = default) @@ -36,26 +34,13 @@ public async Task> GetBuildingEnginesAsync( return engine?.CurrentBuild; } - public async Task CreateEngineAsync( - string engineId, - string? name = null, - CancellationToken cancellationToken = default - ) - { - foreach (BuildJobRunnerType runnerType in Runners.Keys) - { - IBuildJobRunner runner = Runners[runnerType]; - await runner.CreateEngineAsync(engineId, name, cancellationToken); - } - } - - public async Task DeleteEngineAsync(string engineId, CancellationToken cancellationToken = default) + public Task DeleteEngineAsync(string engineId, CancellationToken cancellationToken = default) { - foreach (BuildJobRunnerType runnerType in Runners.Keys) - { - IBuildJobRunner runner = Runners[runnerType]; - await runner.DeleteEngineAsync(engineId, cancellationToken); - } + return Engines.UpdateAsync( + e => e.EngineId == engineId && e.CurrentBuild != null && (e.CurrentBuild.JobState == BuildJobState.Active), + u => u.Set(e => e.CurrentBuild!.JobState, BuildJobState.Deleting), + cancellationToken: cancellationToken + ); } public async Task StartBuildJobAsync( @@ -69,58 +54,36 @@ public async Task StartBuildJobAsync( CancellationToken cancellationToken = default ) { - IBuildJobRunner runner = Runners[runnerType]; - string jobId = await runner.CreateJobAsync( - engineType, - engineId, - buildId, - stage, - data, - buildOptions, - cancellationToken + TEngine? engine = await Engines.UpdateAsync( + e => + e.EngineId == engineId + && ( + (stage == BuildStage.Preprocess && e.CurrentBuild == null) + || ( + stage != BuildStage.Preprocess + && e.CurrentBuild != null + && e.CurrentBuild.JobState != BuildJobState.Canceling + ) + ), + u => + u.Set( + e => e.CurrentBuild, + new Build + { + BuildId = buildId, + JobId = null, + BuildJobRunner = runnerType, + Stage = stage, + JobState = BuildJobState.Pending, + Options = buildOptions, + Data = data, + ExecutionData = new BuildExecutionData(), + } + ), + cancellationToken: cancellationToken ); - try - { - TEngine? engine = await Engines.UpdateAsync( - e => - e.EngineId == engineId - && ( - (stage == BuildStage.Preprocess && e.CurrentBuild == null) - || ( - stage != BuildStage.Preprocess - && e.CurrentBuild != null - && e.CurrentBuild.JobState != BuildJobState.Canceling - ) - ), - u => - u.Set( - e => e.CurrentBuild, - new Build - { - BuildId = buildId, - JobId = jobId, - BuildJobRunner = runner.Type, - Stage = stage, - JobState = BuildJobState.Pending, - Options = buildOptions, - ExecutionData = new BuildExecutionData(), - } - ), - cancellationToken: cancellationToken - ); - if (engine is null) - { - await runner.DeleteJobAsync(jobId, CancellationToken.None); - return false; - } - await runner.EnqueueJobAsync(jobId, engine.Type, cancellationToken); - return true; - } - catch - { - await runner.DeleteJobAsync(jobId, CancellationToken.None); - throw; - } + + return engine is not null; } public virtual async Task<(string? BuildId, BuildJobState State)> CancelBuildJobAsync( @@ -130,7 +93,11 @@ public async Task StartBuildJobAsync( { // cancel a job that hasn't started yet TEngine? engine = await Engines.UpdateAsync( - e => e.EngineId == engineId && e.CurrentBuild != null && e.CurrentBuild.JobState == BuildJobState.Pending, + e => + e.EngineId == engineId + && e.CurrentBuild != null + && e.CurrentBuild.JobState == BuildJobState.Pending + && e.CurrentBuild.JobId == null, u => { u.Unset(b => b.CurrentBuild); @@ -139,25 +106,16 @@ public async Task StartBuildJobAsync( cancellationToken: cancellationToken ); if (engine is not null && engine.CurrentBuild is not null) - { - // job will be deleted from the queue - IBuildJobRunner runner = Runners[engine.CurrentBuild.BuildJobRunner]; - await runner.StopJobAsync(engine.CurrentBuild.JobId, CancellationToken.None); return (engine.CurrentBuild.BuildId, BuildJobState.None); - } - // cancel a job that is already running + // mark a job that is already running as canceling and the dispatcher will stop it engine = await Engines.UpdateAsync( - e => e.EngineId == engineId && e.CurrentBuild != null && e.CurrentBuild.JobState == BuildJobState.Active, + e => e.EngineId == engineId && e.CurrentBuild != null && (e.CurrentBuild.JobState == BuildJobState.Active), u => u.Set(e => e.CurrentBuild!.JobState, BuildJobState.Canceling), cancellationToken: cancellationToken ); if (engine is not null && engine.CurrentBuild is not null) - { - IBuildJobRunner runner = Runners[engine.CurrentBuild.BuildJobRunner]; - await runner.StopJobAsync(engine.CurrentBuild.JobId, CancellationToken.None); return (engine.CurrentBuild.BuildId, BuildJobState.Canceling); - } return (null, BuildJobState.None); } diff --git a/src/Machine/src/Serval.Machine.Shared/Services/ClearMLBuildJobRunner.cs b/src/Machine/src/Serval.Machine.Shared/Services/ClearMLBuildJobRunner.cs index e47ac4b67..c45c9fd6b 100644 --- a/src/Machine/src/Serval.Machine.Shared/Services/ClearMLBuildJobRunner.cs +++ b/src/Machine/src/Serval.Machine.Shared/Services/ClearMLBuildJobRunner.cs @@ -16,15 +16,6 @@ IOptionsMonitor options public BuildJobRunnerType Type => BuildJobRunnerType.ClearML; - public async Task CreateEngineAsync( - string engineId, - string? name = null, - CancellationToken cancellationToken = default - ) - { - await _clearMLService.CreateProjectAsync(engineId, name, cancellationToken); - } - public async Task DeleteEngineAsync(string engineId, CancellationToken cancellationToken = default) { string? projectId = await _clearMLService.GetProjectIdAsync(engineId, cancellationToken); diff --git a/src/Machine/src/Serval.Machine.Shared/Services/ClearMLMonitorService.cs b/src/Machine/src/Serval.Machine.Shared/Services/ClearMLMonitorService.cs index 90323234d..ed69ede33 100644 --- a/src/Machine/src/Serval.Machine.Shared/Services/ClearMLMonitorService.cs +++ b/src/Machine/src/Serval.Machine.Shared/Services/ClearMLMonitorService.cs @@ -72,11 +72,11 @@ var engine in await wordAlignmentBuildJobService.GetBuildingEnginesAsync( if (engineToBuildServiceDict.Count == 0) return; + IEnumerable clearMLTaskIds = engineToBuildServiceDict + .Where(e => !string.IsNullOrEmpty(e.Key.CurrentBuild?.JobId)) + .Select(e => e.Key.CurrentBuild?.JobId!); Dictionary tasks = ( - await _clearMLService.GetTasksByIdAsync( - engineToBuildServiceDict.Select(e => e.Key.CurrentBuild!.JobId), - cancellationToken - ) + await _clearMLService.GetTasksByIdAsync(clearMLTaskIds, cancellationToken) ).ToDictionary(t => t.Id); Dictionary> queuePositionsPerEngineType = new(); @@ -117,8 +117,14 @@ await _clearMLService.GetTasksByIdAsync( ), } ); - if (engine.CurrentBuild is null || !tasks.TryGetValue(engine.CurrentBuild.JobId, out ClearMLTask? task)) + if ( + engine.CurrentBuild is null + || engine.CurrentBuild.JobId is null + || !tasks.TryGetValue(engine.CurrentBuild.JobId, out ClearMLTask? task) + ) + { continue; + } if ( engine.CurrentBuild.JobState is BuildJobState.Pending diff --git a/src/Machine/src/Serval.Machine.Shared/Services/IBuildJobRunner.cs b/src/Machine/src/Serval.Machine.Shared/Services/IBuildJobRunner.cs index 0c04cbde9..83d44c373 100644 --- a/src/Machine/src/Serval.Machine.Shared/Services/IBuildJobRunner.cs +++ b/src/Machine/src/Serval.Machine.Shared/Services/IBuildJobRunner.cs @@ -4,7 +4,6 @@ public interface IBuildJobRunner { BuildJobRunnerType Type { get; } - Task CreateEngineAsync(string engineId, string? name = null, CancellationToken cancellationToken = default); Task DeleteEngineAsync(string engineId, CancellationToken cancellationToken = default); Task CreateJobAsync( diff --git a/src/Machine/src/Serval.Machine.Shared/Services/IBuildJobService.cs b/src/Machine/src/Serval.Machine.Shared/Services/IBuildJobService.cs index b92d6f900..cea017ba8 100644 --- a/src/Machine/src/Serval.Machine.Shared/Services/IBuildJobService.cs +++ b/src/Machine/src/Serval.Machine.Shared/Services/IBuildJobService.cs @@ -3,9 +3,6 @@ public interface IBuildJobService { Task IsEngineBuilding(string engineId, CancellationToken cancellationToken = default); - - Task CreateEngineAsync(string engineId, string? name = null, CancellationToken cancellationToken = default); - Task DeleteEngineAsync(string engineId, CancellationToken cancellationToken = default); Task StartBuildJobAsync( diff --git a/src/Machine/src/Serval.Machine.Shared/Services/TranslationBuildJobService.cs b/src/Machine/src/Serval.Machine.Shared/Services/TranslationBuildJobService.cs index 8dd2c493a..530b0cf62 100644 --- a/src/Machine/src/Serval.Machine.Shared/Services/TranslationBuildJobService.cs +++ b/src/Machine/src/Serval.Machine.Shared/Services/TranslationBuildJobService.cs @@ -1,7 +1,7 @@ namespace Serval.Machine.Shared.Services; -public class TranslationBuildJobService(IEnumerable runners, IRepository engines) - : BuildJobService(runners, engines) +public class TranslationBuildJobService(IRepository engines) + : BuildJobService(engines) { public override async Task<(string? BuildId, BuildJobState State)> CancelBuildJobAsync( string engineId, @@ -21,22 +21,26 @@ public class TranslationBuildJobService(IEnumerable runners, IR ); if (engine is not null && engine.CurrentBuild is not null) { - // job will be deleted from the queue - IBuildJobRunner runner = Runners[engine.CurrentBuild.BuildJobRunner]; - await runner.StopJobAsync(engine.CurrentBuild.JobId, CancellationToken.None); return (engine.CurrentBuild.BuildId, BuildJobState.None); } - // cancel a job that is already running + // cancel a job that is already running or already created engine = await Engines.UpdateAsync( - e => e.EngineId == engineId && e.CurrentBuild != null && e.CurrentBuild.JobState == BuildJobState.Active, - u => u.Set(e => e.CurrentBuild!.JobState, BuildJobState.Canceling), + e => + e.EngineId == engineId + && e.CurrentBuild != null + && ( + e.CurrentBuild.JobState == BuildJobState.Pending || e.CurrentBuild.JobState == BuildJobState.Active + ), + u => + { + u.Set(e => e.CurrentBuild!.JobState, BuildJobState.Canceling); + u.Set(e => e.CollectTrainSegmentPairs, false); + }, cancellationToken: cancellationToken ); if (engine is not null && engine.CurrentBuild is not null) { - IBuildJobRunner runner = Runners[engine.CurrentBuild.BuildJobRunner]; - await runner.StopJobAsync(engine.CurrentBuild.JobId, CancellationToken.None); return (engine.CurrentBuild.BuildId, BuildJobState.Canceling); } diff --git a/src/Machine/test/Serval.Machine.Shared.Tests/Services/ClearMLMonitorServiceTests.cs b/src/Machine/test/Serval.Machine.Shared.Tests/Services/ClearMLMonitorServiceTests.cs index 06b0d7bee..432b31e03 100644 --- a/src/Machine/test/Serval.Machine.Shared.Tests/Services/ClearMLMonitorServiceTests.cs +++ b/src/Machine/test/Serval.Machine.Shared.Tests/Services/ClearMLMonitorServiceTests.cs @@ -163,7 +163,7 @@ public async Task MonitorClearMLTasksPerDomain_QueuedStatus_UpdatesQueuePosition SetupBuildingEngines(engine); ClearMLTask task = CreateClearMLTask( - id: engine!.CurrentBuild!.JobId, + id: engine!.CurrentBuild!.JobId!, name: engine.CurrentBuild.BuildId, status: ClearMLTaskStatus.Queued, created: DateTime.UtcNow @@ -202,7 +202,7 @@ public async Task MonitorClearMLTasksPerDomain_InProgress_UpdatesProgressCorrect }; ClearMLTask task = CreateClearMLTask( - id: engine!.CurrentBuild!.JobId, + id: engine!.CurrentBuild!.JobId!, name: engine.CurrentBuild.BuildId, status: ClearMLTaskStatus.InProgress, created: DateTime.UtcNow, @@ -256,7 +256,7 @@ public async Task MonitorClearMLTasksPerDomain_CompletedStatus_ProperlyHandlesCo }; ClearMLTask task = CreateClearMLTask( - id: engine!.CurrentBuild!.JobId, + id: engine!.CurrentBuild!.JobId!, name: engine.CurrentBuild.BuildId, status: ClearMLTaskStatus.Completed, created: DateTime.UtcNow, @@ -331,7 +331,7 @@ public async Task MonitorClearMLTasksPerDomain_StoppedStatus_CancelsBuildAndClea // Setup stopped task ClearMLTask task = CreateClearMLTask( - id: engine!.CurrentBuild!.JobId, + id: engine!.CurrentBuild!.JobId!, name: engine.CurrentBuild.BuildId, status: ClearMLTaskStatus.Stopped, created: DateTime.UtcNow, @@ -380,7 +380,7 @@ public async Task MonitorClearMLTasksPerDomain_FailedStatus_ProperlyHandlesFailu // Setup failed task ClearMLTask task = CreateClearMLTask( - id: engine.CurrentBuild.JobId, + id: engine.CurrentBuild!.JobId!, name: engine.CurrentBuild.BuildId, status: ClearMLTaskStatus.Failed, created: DateTime.UtcNow, diff --git a/src/Machine/test/Serval.Machine.Shared.Tests/Services/NmtEngineServiceTests.cs b/src/Machine/test/Serval.Machine.Shared.Tests/Services/NmtEngineServiceTests.cs index a88910861..92d9af766 100644 --- a/src/Machine/test/Serval.Machine.Shared.Tests/Services/NmtEngineServiceTests.cs +++ b/src/Machine/test/Serval.Machine.Shared.Tests/Services/NmtEngineServiceTests.cs @@ -173,20 +173,20 @@ public TestEnvironment() } ); BuildJobService = new BuildJobService( - [ - new HangfireBuildJobRunner(_jobClient, [new NmtHangfireBuildJobFactory()]), - new ClearMLBuildJobRunner( - ClearMLService, - [ - new NmtClearMLBuildJobFactory( - SharedFileService, - Substitute.For(), - Engines - ), - ], - BuildJobOptions - ), - ], + // [ + // new HangfireBuildJobRunner(_jobClient, [new NmtHangfireBuildJobFactory()]), + // new ClearMLBuildJobRunner( + // ClearMLService, + // [ + // new NmtClearMLBuildJobFactory( + // SharedFileService, + // Substitute.For(), + // Engines + // ), + // ], + // BuildJobOptions + // ), + // ], Engines ); var clearMLOptions = Substitute.For>(); diff --git a/src/Machine/test/Serval.Machine.Shared.Tests/Services/PreprocessBuildJobTests.cs b/src/Machine/test/Serval.Machine.Shared.Tests/Services/PreprocessBuildJobTests.cs index 50c802717..1e9901ba0 100644 --- a/src/Machine/test/Serval.Machine.Shared.Tests/Services/PreprocessBuildJobTests.cs +++ b/src/Machine/test/Serval.Machine.Shared.Tests/Services/PreprocessBuildJobTests.cs @@ -258,24 +258,24 @@ public TestEnvironment() .Returns(Task.FromResult("job1")); SharedFileService = new SharedFileService(Substitute.For()); BuildJobService = new BuildJobService( - [ - new HangfireBuildJobRunner( - Substitute.For(), - [new NmtHangfireBuildJobFactory(), new SmtTransferHangfireBuildJobFactory()] - ), - new ClearMLBuildJobRunner( - ClearMLService, - [ - new NmtClearMLBuildJobFactory( - SharedFileService, - Substitute.For(), - Engines - ), - new SmtTransferClearMLBuildJobFactory(SharedFileService, Engines), - ], - BuildJobOptions - ), - ], + // [ + // new HangfireBuildJobRunner( + // Substitute.For(), + // [new NmtHangfireBuildJobFactory(), new SmtTransferHangfireBuildJobFactory()] + // ), + // new ClearMLBuildJobRunner( + // ClearMLService, + // [ + // new NmtClearMLBuildJobFactory( + // SharedFileService, + // Substitute.For(), + // Engines + // ), + // new SmtTransferClearMLBuildJobFactory(SharedFileService, Engines), + // ], + // BuildJobOptions + // ), + // ], Engines ); ParallelCorpusService = Substitute.For(); diff --git a/src/Machine/test/Serval.Machine.Shared.Tests/Services/SmtTransferEngineServiceTests.cs b/src/Machine/test/Serval.Machine.Shared.Tests/Services/SmtTransferEngineServiceTests.cs index 4fd088aee..168320816 100644 --- a/src/Machine/test/Serval.Machine.Shared.Tests/Services/SmtTransferEngineServiceTests.cs +++ b/src/Machine/test/Serval.Machine.Shared.Tests/Services/SmtTransferEngineServiceTests.cs @@ -344,14 +344,14 @@ public TestEnvironment(BuildJobRunnerType trainJobRunnerType = BuildJobRunnerTyp Substitute.For>() ); BuildJobService = new BuildJobService( - [ - new HangfireBuildJobRunner(_jobClient, [new SmtTransferHangfireBuildJobFactory()]), - new ClearMLBuildJobRunner( - ClearMLService, - [new SmtTransferClearMLBuildJobFactory(SharedFileService, Engines)], - BuildJobOptions - ), - ], + // [ + // new HangfireBuildJobRunner(_jobClient, [new SmtTransferHangfireBuildJobFactory()]), + // new ClearMLBuildJobRunner( + // ClearMLService, + // [new SmtTransferClearMLBuildJobFactory(SharedFileService, Engines)], + // BuildJobOptions + // ), + // ], Engines ); _jobServer = CreateJobServer(); diff --git a/src/Machine/test/Serval.Machine.Shared.Tests/Services/StatisticalEngineServiceTests.cs b/src/Machine/test/Serval.Machine.Shared.Tests/Services/StatisticalEngineServiceTests.cs index c6d6ea0d5..f66587d8c 100644 --- a/src/Machine/test/Serval.Machine.Shared.Tests/Services/StatisticalEngineServiceTests.cs +++ b/src/Machine/test/Serval.Machine.Shared.Tests/Services/StatisticalEngineServiceTests.cs @@ -227,14 +227,14 @@ public TestEnvironment(BuildJobRunnerType trainJobRunnerType = BuildJobRunnerTyp Substitute.For>() ); BuildJobService = new BuildJobService( - [ - new HangfireBuildJobRunner(_jobClient, [new StatisticalHangfireBuildJobFactory()]), - new ClearMLBuildJobRunner( - ClearMLService, - [new StatisticalClearMLBuildJobFactory(SharedFileService, Engines)], - BuildJobOptions - ), - ], + // [ + // new HangfireBuildJobRunner(_jobClient, [new StatisticalHangfireBuildJobFactory()]), + // new ClearMLBuildJobRunner( + // ClearMLService, + // [new StatisticalClearMLBuildJobFactory(SharedFileService, Engines)], + // BuildJobOptions + // ), + // ], Engines ); _jobServer = CreateJobServer(); From e068e562e3333bd0126d64cbaf7d555c880e92a8 Mon Sep 17 00:00:00 2001 From: Enkidu93 Date: Wed, 13 May 2026 14:59:37 -0400 Subject: [PATCH 2/9] Add Queued build state --- .../src/Serval.Machine.Shared/Models/Build.cs | 1 + .../Services/BuildJobRunnerManager.cs | 37 ++++++++++++------- .../Services/BuildJobService.cs | 6 +-- 3 files changed, 28 insertions(+), 16 deletions(-) diff --git a/src/Machine/src/Serval.Machine.Shared/Models/Build.cs b/src/Machine/src/Serval.Machine.Shared/Models/Build.cs index c76bc5da2..a536a35c3 100644 --- a/src/Machine/src/Serval.Machine.Shared/Models/Build.cs +++ b/src/Machine/src/Serval.Machine.Shared/Models/Build.cs @@ -6,6 +6,7 @@ public enum BuildJobState Pending, Active, Canceling, + Queued, Deleting, } diff --git a/src/Machine/src/Serval.Machine.Shared/Services/BuildJobRunnerManager.cs b/src/Machine/src/Serval.Machine.Shared/Services/BuildJobRunnerManager.cs index 8e3bf4a09..ebfc9ff83 100644 --- a/src/Machine/src/Serval.Machine.Shared/Services/BuildJobRunnerManager.cs +++ b/src/Machine/src/Serval.Machine.Shared/Services/BuildJobRunnerManager.cs @@ -16,7 +16,7 @@ protected override async Task DoWorkAsync(IServiceScope scope, CancellationToken .ToDictionary(r => r.Type); var engines = scope.ServiceProvider.GetRequiredService>(); - await DispatchPendingBuildsAsync( + await DispatchQueuedBuildsAsync( engines, runners, logger, @@ -28,7 +28,7 @@ await DispatchPendingBuildsAsync( await DeleteDeletingEngines(engines, runners, logger, cancellationToken); } - private static async Task DispatchPendingBuildsAsync( + private static async Task DispatchQueuedBuildsAsync( IRepository engines, IReadOnlyDictionary runners, ILogger> logger, @@ -39,7 +39,7 @@ CancellationToken cancellationToken { foreach ( TEngine engine in await engines.GetAllAsync( - e => e.CurrentBuild != null && e.CurrentBuild.JobState == BuildJobState.Pending, + e => e.CurrentBuild != null && e.CurrentBuild.JobState == BuildJobState.Queued, cancellationToken ) ) @@ -62,16 +62,27 @@ TEngine engine in await engines.GetAllAsync( string? jobId = null; try { - jobId = await runner.CreateJobAsync( - engine.Type, - engine.EngineId, - build.BuildId, - build.Stage, - build.Data, - build.Options, - cancellationToken + await dataAccessContext.WithTransactionAsync( + async (ct) => + { + await engines.UpdateAsync( + e => e.EngineId == engine.Id && e.CurrentBuild == null, + u => u.Set(e => e.CurrentBuild!.JobState, BuildJobState.Pending), + cancellationToken: ct + ); + jobId = await runner.CreateJobAsync( + engine.Type, + engine.EngineId, + build.BuildId, + build.Stage, + build.Data, + build.Options, + ct + ); + await runner.EnqueueJobAsync(jobId, engine.Type, cancellationToken); + }, + cancellationToken: CancellationToken.None ); - await runner.EnqueueJobAsync(jobId, engine.Type, cancellationToken); } catch (Exception e) { @@ -143,7 +154,7 @@ CancellationToken cancellationToken { foreach ( TEngine engine in await engines.GetAllAsync( - e => e.CurrentBuild != null && e.CurrentBuild.JobState == BuildJobState.Canceling, + e => e.CurrentBuild != null && e.CurrentBuild.JobState == BuildJobState.Deleting, cancellationToken ) ) diff --git a/src/Machine/src/Serval.Machine.Shared/Services/BuildJobService.cs b/src/Machine/src/Serval.Machine.Shared/Services/BuildJobService.cs index 2f8774aa1..1fd11ac65 100644 --- a/src/Machine/src/Serval.Machine.Shared/Services/BuildJobService.cs +++ b/src/Machine/src/Serval.Machine.Shared/Services/BuildJobService.cs @@ -74,7 +74,7 @@ public async Task StartBuildJobAsync( JobId = null, BuildJobRunner = runnerType, Stage = stage, - JobState = BuildJobState.Pending, + JobState = BuildJobState.Queued, Options = buildOptions, Data = data, ExecutionData = new BuildExecutionData(), @@ -96,7 +96,7 @@ public async Task StartBuildJobAsync( e => e.EngineId == engineId && e.CurrentBuild != null - && e.CurrentBuild.JobState == BuildJobState.Pending + && (e.CurrentBuild.JobState == BuildJobState.Pending || e.CurrentBuild.JobState == BuildJobState.Queued) && e.CurrentBuild.JobId == null, u => { @@ -161,7 +161,7 @@ public Task BuildJobRestartingAsync(string engineId, string buildId, Cancellatio { return Engines.UpdateAsync( e => e.EngineId == engineId && e.CurrentBuild != null && e.CurrentBuild.BuildId == buildId, - u => u.Set(e => e.CurrentBuild!.JobState, BuildJobState.Pending), + u => u.Set(e => e.CurrentBuild!.JobState, BuildJobState.Queued), cancellationToken: cancellationToken ); } From 591f7a19163566e271c8f6bdbd8b6a5ac09c8123 Mon Sep 17 00:00:00 2001 From: Enkidu93 Date: Wed, 13 May 2026 15:10:23 -0400 Subject: [PATCH 3/9] Add TODOs --- .../Services/BuildJobRunnerManager.cs | 39 ++++++++----------- 1 file changed, 16 insertions(+), 23 deletions(-) diff --git a/src/Machine/src/Serval.Machine.Shared/Services/BuildJobRunnerManager.cs b/src/Machine/src/Serval.Machine.Shared/Services/BuildJobRunnerManager.cs index ebfc9ff83..03b041d21 100644 --- a/src/Machine/src/Serval.Machine.Shared/Services/BuildJobRunnerManager.cs +++ b/src/Machine/src/Serval.Machine.Shared/Services/BuildJobRunnerManager.cs @@ -46,19 +46,9 @@ TEngine engine in await engines.GetAllAsync( { Build build = engine.CurrentBuild!; if (!string.IsNullOrEmpty(build.JobId)) - // TODO - should these be cleaned up? + //TODO - should these be cleaned up? continue; - if (!runners.TryGetValue(build.BuildJobRunner, out IBuildJobRunner? runner) || runner is null) - { - logger.LogWarning( - "No runner found for build {BuildId} on engine {EngineId}.", - build.BuildId, - engine.EngineId - ); - continue; - } - string? jobId = null; try { @@ -66,20 +56,21 @@ await dataAccessContext.WithTransactionAsync( async (ct) => { await engines.UpdateAsync( - e => e.EngineId == engine.Id && e.CurrentBuild == null, + e => e.EngineId == engine.Id, u => u.Set(e => e.CurrentBuild!.JobState, BuildJobState.Pending), cancellationToken: ct ); - jobId = await runner.CreateJobAsync( - engine.Type, - engine.EngineId, - build.BuildId, - build.Stage, - build.Data, - build.Options, - ct - ); - await runner.EnqueueJobAsync(jobId, engine.Type, cancellationToken); + jobId = await runners[build.BuildJobRunner] + .CreateJobAsync( + engine.Type, + engine.EngineId, + build.BuildId, + build.Stage, + build.Data, + build.Options, + ct + ); + await runners[build.BuildJobRunner].EnqueueJobAsync(jobId, engine.Type, cancellationToken); }, cancellationToken: CancellationToken.None ); @@ -103,7 +94,7 @@ await engines.UpdateAsync( cancellationToken: cancellationToken ); if (jobId != null) - await runner.DeleteJobAsync(jobId, CancellationToken.None); + await runners[build.BuildJobRunner].DeleteJobAsync(jobId, CancellationToken.None); }, cancellationToken: CancellationToken.None ); @@ -127,6 +118,7 @@ TEngine engine in await engines.GetAllAsync( { Build build = engine.CurrentBuild!; if (string.IsNullOrEmpty(build.JobId)) + //TODO - should these be cleaned up? continue; try @@ -152,6 +144,7 @@ private static async Task DeleteDeletingEngines( CancellationToken cancellationToken ) { + //TODO what about non-building engines? For ClearML this would still be needed foreach ( TEngine engine in await engines.GetAllAsync( e => e.CurrentBuild != null && e.CurrentBuild.JobState == BuildJobState.Deleting, From 530a3ca67d5b91d07e3494a83747c02979fc73ba Mon Sep 17 00:00:00 2001 From: Enkidu93 Date: Wed, 13 May 2026 15:11:11 -0400 Subject: [PATCH 4/9] Fix naming --- .../Services/BuildJobRunnerManager.cs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/Machine/src/Serval.Machine.Shared/Services/BuildJobRunnerManager.cs b/src/Machine/src/Serval.Machine.Shared/Services/BuildJobRunnerManager.cs index 03b041d21..c8c8a5a3e 100644 --- a/src/Machine/src/Serval.Machine.Shared/Services/BuildJobRunnerManager.cs +++ b/src/Machine/src/Serval.Machine.Shared/Services/BuildJobRunnerManager.cs @@ -16,7 +16,7 @@ protected override async Task DoWorkAsync(IServiceScope scope, CancellationToken .ToDictionary(r => r.Type); var engines = scope.ServiceProvider.GetRequiredService>(); - await DispatchQueuedBuildsAsync( + await DispatchQueuedBuildJobsAsync( engines, runners, logger, @@ -24,11 +24,11 @@ await DispatchQueuedBuildsAsync( platformService, cancellationToken ); - await StopCancelingBuildsAsync(engines, runners, logger, cancellationToken); + await StopCancelingBuildJobsAsync(engines, runners, logger, cancellationToken); await DeleteDeletingEngines(engines, runners, logger, cancellationToken); } - private static async Task DispatchQueuedBuildsAsync( + private static async Task DispatchQueuedBuildJobsAsync( IRepository engines, IReadOnlyDictionary runners, ILogger> logger, @@ -102,7 +102,7 @@ await engines.UpdateAsync( } } - private static async Task StopCancelingBuildsAsync( + private static async Task StopCancelingBuildJobsAsync( IRepository engines, IReadOnlyDictionary runners, ILogger> logger, From 52cdf225d1c4a1bc866778165e37caaef289b029 Mon Sep 17 00:00:00 2001 From: Enkidu93 Date: Thu, 14 May 2026 17:36:01 -0400 Subject: [PATCH 5/9] Move data into Build model --- .../src/Serval.Machine.Shared/Models/Build.cs | 2 +- .../Serval.Machine.Shared/Models/BuildData.cs | 8 +++ .../Services/BuildJobRunnerManager.cs | 16 +++++- .../Services/BuildJobService.cs | 2 +- .../Services/ClearMLBuildJobRunner.cs | 2 - .../Services/ClearMLMonitorService.cs | 2 +- .../Services/HangfireBuildJob.cs | 45 +++------------- .../Services/HangfireBuildJobRunner.cs | 21 +------- .../Services/IBuildJobRunner.cs | 1 - .../Services/IBuildJobService.cs | 2 +- .../Services/IClearMLBuildJobFactory.cs | 1 - .../Services/IHangfireBuildJobFactory.cs | 2 +- .../Services/NmtClearMLBuildJobFactory.cs | 1 - .../Services/NmtEngineService.cs | 2 +- .../Services/NmtHangfireBuildJobFactory.cs | 16 +++--- .../Services/PostprocessBuildJob.cs | 9 +--- .../Services/PreprocessBuildJob.cs | 19 ++----- .../SmtTransferClearMLBuildJobFactory.cs | 1 - .../Services/SmtTransferEngineService.cs | 2 +- .../SmtTransferHangfireBuildJobFactory.cs | 16 +++--- .../Services/SmtTransferPreprocessBuildJob.cs | 7 +-- .../Services/SmtTransferTrainBuildJob.cs | 16 ++---- .../Services/StatisticalEngineService.cs | 2 +- .../StatisticalHangfireBuildJobFactory.cs | 16 +++--- .../StatisticalPostprocessBuildJob.cs | 9 +++- .../Services/StatisticalTrainBuildJob.cs | 14 ++--- .../StatsiticalClearMLBuildJobFactory.cs | 1 - .../TranslationPostprocessBuildJob.cs | 9 +++- .../Services/ClearMLMonitorServiceTests.cs | 4 +- .../Services/NmtEngineServiceTests.cs | 2 +- .../Services/PreprocessBuildJobTests.cs | 51 +++++++++---------- .../Services/SmtTransferEngineServiceTests.cs | 2 +- .../Services/StatisticalEngineServiceTests.cs | 2 +- 33 files changed, 120 insertions(+), 185 deletions(-) create mode 100644 src/Machine/src/Serval.Machine.Shared/Models/BuildData.cs diff --git a/src/Machine/src/Serval.Machine.Shared/Models/Build.cs b/src/Machine/src/Serval.Machine.Shared/Models/Build.cs index a536a35c3..6a9f084ae 100644 --- a/src/Machine/src/Serval.Machine.Shared/Models/Build.cs +++ b/src/Machine/src/Serval.Machine.Shared/Models/Build.cs @@ -31,6 +31,6 @@ public record Build public required BuildJobRunnerType BuildJobRunner { get; init; } public required BuildStage Stage { get; init; } public string? Options { get; set; } - public object? Data { get; init; } + public BuildData? Data { get; init; } public required BuildExecutionData ExecutionData { get; init; } } diff --git a/src/Machine/src/Serval.Machine.Shared/Models/BuildData.cs b/src/Machine/src/Serval.Machine.Shared/Models/BuildData.cs new file mode 100644 index 000000000..1135198e0 --- /dev/null +++ b/src/Machine/src/Serval.Machine.Shared/Models/BuildData.cs @@ -0,0 +1,8 @@ +namespace Serval.Machine.Shared.Models; + +public record BuildData +{ + public IReadOnlyList? ParallelCorpora { get; init; } + public double? Confidence { get; init; } + public int? CorpusSize { get; init; } +} diff --git a/src/Machine/src/Serval.Machine.Shared/Services/BuildJobRunnerManager.cs b/src/Machine/src/Serval.Machine.Shared/Services/BuildJobRunnerManager.cs index c8c8a5a3e..b1fd8bf20 100644 --- a/src/Machine/src/Serval.Machine.Shared/Services/BuildJobRunnerManager.cs +++ b/src/Machine/src/Serval.Machine.Shared/Services/BuildJobRunnerManager.cs @@ -8,9 +8,10 @@ public class BuildJobRunnerManager(IServiceProvider services, ILogger(); var logger = scope.ServiceProvider.GetRequiredService>>(); var dataAccessContext = scope.ServiceProvider.GetRequiredService(); - var platformService = scope.ServiceProvider.GetRequiredService(); + var platformService = scope.ServiceProvider.GetRequiredKeyedService(engineGroup); var runners = scope .ServiceProvider.GetRequiredService>() .ToDictionary(r => r.Type); @@ -66,7 +67,6 @@ await engines.UpdateAsync( engine.EngineId, build.BuildId, build.Stage, - build.Data, build.Options, ct ); @@ -166,4 +166,16 @@ TEngine engine in await engines.GetAllAsync( } } } + + private static EngineGroup GetEngineGroup() + where T : ITrainingEngine + { + //TODO is there a better way? Could just explicitly create translation and alignment managers? + return typeof(T).Name switch + { + nameof(TranslationEngine) => EngineGroup.Translation, + nameof(WordAlignmentEngine) => EngineGroup.WordAlignment, + _ => throw new InvalidOperationException($"Unknown engine type: {typeof(T).Name}"), + }; + } } diff --git a/src/Machine/src/Serval.Machine.Shared/Services/BuildJobService.cs b/src/Machine/src/Serval.Machine.Shared/Services/BuildJobService.cs index 1fd11ac65..598292b39 100644 --- a/src/Machine/src/Serval.Machine.Shared/Services/BuildJobService.cs +++ b/src/Machine/src/Serval.Machine.Shared/Services/BuildJobService.cs @@ -49,7 +49,7 @@ public async Task StartBuildJobAsync( string engineId, string buildId, BuildStage stage, - object? data = null, + BuildData? data = null, string? buildOptions = null, CancellationToken cancellationToken = default ) diff --git a/src/Machine/src/Serval.Machine.Shared/Services/ClearMLBuildJobRunner.cs b/src/Machine/src/Serval.Machine.Shared/Services/ClearMLBuildJobRunner.cs index c45c9fd6b..007a46759 100644 --- a/src/Machine/src/Serval.Machine.Shared/Services/ClearMLBuildJobRunner.cs +++ b/src/Machine/src/Serval.Machine.Shared/Services/ClearMLBuildJobRunner.cs @@ -28,7 +28,6 @@ public async Task CreateJobAsync( string engineId, string buildId, BuildStage stage, - object? data = null, string? buildOptions = null, CancellationToken cancellationToken = default ) @@ -46,7 +45,6 @@ public async Task CreateJobAsync( buildId, _options[engineType].ModelType, stage, - data, buildOptions, cancellationToken ); diff --git a/src/Machine/src/Serval.Machine.Shared/Services/ClearMLMonitorService.cs b/src/Machine/src/Serval.Machine.Shared/Services/ClearMLMonitorService.cs index ed69ede33..2935df0eb 100644 --- a/src/Machine/src/Serval.Machine.Shared/Services/ClearMLMonitorService.cs +++ b/src/Machine/src/Serval.Machine.Shared/Services/ClearMLMonitorService.cs @@ -311,7 +311,7 @@ CancellationToken cancellationToken engineId, buildId, BuildStage.Postprocess, - (corpusSize, confidence), + new BuildData { CorpusSize = corpusSize, Confidence = confidence }, buildOptions, cancellationToken ); diff --git a/src/Machine/src/Serval.Machine.Shared/Services/HangfireBuildJob.cs b/src/Machine/src/Serval.Machine.Shared/Services/HangfireBuildJob.cs index 0b40ee6e5..872b5cc3a 100644 --- a/src/Machine/src/Serval.Machine.Shared/Services/HangfireBuildJob.cs +++ b/src/Machine/src/Serval.Machine.Shared/Services/HangfireBuildJob.cs @@ -6,27 +6,6 @@ public abstract class HangfireBuildJob( IDataAccessContext dataAccessContext, IBuildJobService buildJobService, ILogger> logger -) : HangfireBuildJob(platformService, engines, dataAccessContext, buildJobService, logger) - where TEngine : ITrainingEngine -{ - [AutomaticRetry(Attempts = 0)] - public virtual Task RunAsync( - string engineId, - string buildId, - string? buildOptions, - CancellationToken cancellationToken - ) - { - return RunAsync(engineId, buildId, null, buildOptions, cancellationToken); - } -} - -public abstract class HangfireBuildJob( - IPlatformService platformService, - IRepository engines, - IDataAccessContext dataAccessContext, - IBuildJobService buildJobService, - ILogger> logger ) where TEngine : ITrainingEngine { @@ -34,13 +13,12 @@ ILogger> logger protected IRepository Engines { get; } = engines; protected IDataAccessContext DataAccessContext { get; } = dataAccessContext; protected IBuildJobService BuildJobService { get; } = buildJobService; - protected ILogger> Logger { get; } = logger; + protected ILogger> Logger { get; } = logger; [AutomaticRetry(Attempts = 0)] public virtual async Task RunAsync( string engineId, string buildId, - TData data, string? buildOptions, CancellationToken cancellationToken ) @@ -48,14 +26,14 @@ CancellationToken cancellationToken JobCompletionStatus completionStatus = JobCompletionStatus.Completed; try { - await InitializeAsync(engineId, buildId, data, cancellationToken); + await InitializeAsync(engineId, buildId, cancellationToken); if (!await BuildJobService.BuildJobStartedAsync(engineId, buildId, cancellationToken)) { completionStatus = JobCompletionStatus.Canceled; return; } - await DoWorkAsync(engineId, buildId, data, buildOptions, cancellationToken); + await DoWorkAsync(engineId, buildId, buildOptions, cancellationToken); } catch (OperationCanceledException e) { @@ -128,16 +106,11 @@ await BuildJobService.BuildJobFinishedAsync( } finally { - await CleanupAsync(engineId, buildId, data, completionStatus); + await CleanupAsync(engineId, buildId, completionStatus); } } - protected virtual Task InitializeAsync( - string engineId, - string buildId, - TData data, - CancellationToken cancellationToken - ) + protected virtual Task InitializeAsync(string engineId, string buildId, CancellationToken cancellationToken) { return Task.CompletedTask; } @@ -145,17 +118,11 @@ CancellationToken cancellationToken protected abstract Task DoWorkAsync( string engineId, string buildId, - TData data, string? buildOptions, CancellationToken cancellationToken ); - protected virtual Task CleanupAsync( - string engineId, - string buildId, - TData data, - JobCompletionStatus completionStatus - ) + protected virtual Task CleanupAsync(string engineId, string buildId, JobCompletionStatus completionStatus) { return Task.CompletedTask; } diff --git a/src/Machine/src/Serval.Machine.Shared/Services/HangfireBuildJobRunner.cs b/src/Machine/src/Serval.Machine.Shared/Services/HangfireBuildJobRunner.cs index fc91536a6..8c2f3c50b 100644 --- a/src/Machine/src/Serval.Machine.Shared/Services/HangfireBuildJobRunner.cs +++ b/src/Machine/src/Serval.Machine.Shared/Services/HangfireBuildJobRunner.cs @@ -5,24 +5,6 @@ public class HangfireBuildJobRunner( IEnumerable buildJobFactories ) : IBuildJobRunner { - public static Job CreateJob( - string engineId, - string buildId, - string queue, - object? data, - string? buildOptions - ) - where TEngine : ITrainingEngine - where TJob : HangfireBuildJob - { - ArgumentNullException.ThrowIfNull(data); - // Token "None" is used here because hangfire injects the proper cancellation token - return Job.FromExpression( - j => j.RunAsync(engineId, buildId, (TData)data, buildOptions, CancellationToken.None), - queue - ); - } - public static Job CreateJob(string engineId, string buildId, string queue, string? buildOptions) where TEngine : ITrainingEngine where TJob : HangfireBuildJob @@ -55,13 +37,12 @@ public Task CreateJobAsync( string engineId, string buildId, BuildStage stage, - object? data = null, string? buildOptions = null, CancellationToken cancellationToken = default ) { IHangfireBuildJobFactory buildJobFactory = _buildJobFactories[engineType]; - Job job = buildJobFactory.CreateJob(engineId, buildId, stage, data, buildOptions); + Job job = buildJobFactory.CreateJob(engineId, buildId, stage, buildOptions); return Task.FromResult(_jobClient.Create(job, new ScheduledState(TimeSpan.FromDays(10000)))); } diff --git a/src/Machine/src/Serval.Machine.Shared/Services/IBuildJobRunner.cs b/src/Machine/src/Serval.Machine.Shared/Services/IBuildJobRunner.cs index 83d44c373..ada20b044 100644 --- a/src/Machine/src/Serval.Machine.Shared/Services/IBuildJobRunner.cs +++ b/src/Machine/src/Serval.Machine.Shared/Services/IBuildJobRunner.cs @@ -11,7 +11,6 @@ Task CreateJobAsync( string engineId, string buildId, BuildStage stage, - object? data = null, string? buildOptions = null, CancellationToken cancellationToken = default ); diff --git a/src/Machine/src/Serval.Machine.Shared/Services/IBuildJobService.cs b/src/Machine/src/Serval.Machine.Shared/Services/IBuildJobService.cs index cea017ba8..327a3f875 100644 --- a/src/Machine/src/Serval.Machine.Shared/Services/IBuildJobService.cs +++ b/src/Machine/src/Serval.Machine.Shared/Services/IBuildJobService.cs @@ -11,7 +11,7 @@ Task StartBuildJobAsync( string engineId, string buildId, BuildStage stage, - object? data = default, + BuildData? data = null, string? buildOptions = default, CancellationToken cancellationToken = default ); diff --git a/src/Machine/src/Serval.Machine.Shared/Services/IClearMLBuildJobFactory.cs b/src/Machine/src/Serval.Machine.Shared/Services/IClearMLBuildJobFactory.cs index fe265fc6a..3dbd6e2b7 100644 --- a/src/Machine/src/Serval.Machine.Shared/Services/IClearMLBuildJobFactory.cs +++ b/src/Machine/src/Serval.Machine.Shared/Services/IClearMLBuildJobFactory.cs @@ -9,7 +9,6 @@ Task CreateJobScriptAsync( string buildId, string modelType, BuildStage stage, - object? data = null, string? buildOptions = null, CancellationToken cancellationToken = default ); diff --git a/src/Machine/src/Serval.Machine.Shared/Services/IHangfireBuildJobFactory.cs b/src/Machine/src/Serval.Machine.Shared/Services/IHangfireBuildJobFactory.cs index e57ac8c54..dd7c87edf 100644 --- a/src/Machine/src/Serval.Machine.Shared/Services/IHangfireBuildJobFactory.cs +++ b/src/Machine/src/Serval.Machine.Shared/Services/IHangfireBuildJobFactory.cs @@ -4,5 +4,5 @@ public interface IHangfireBuildJobFactory { EngineType EngineType { get; } - Job CreateJob(string engineId, string buildId, BuildStage stage, object? data, string? buildOptions); + Job CreateJob(string engineId, string buildId, BuildStage stage, string? buildOptions); } diff --git a/src/Machine/src/Serval.Machine.Shared/Services/NmtClearMLBuildJobFactory.cs b/src/Machine/src/Serval.Machine.Shared/Services/NmtClearMLBuildJobFactory.cs index ee2b07bc7..4ab83dc42 100644 --- a/src/Machine/src/Serval.Machine.Shared/Services/NmtClearMLBuildJobFactory.cs +++ b/src/Machine/src/Serval.Machine.Shared/Services/NmtClearMLBuildJobFactory.cs @@ -17,7 +17,6 @@ public async Task CreateJobScriptAsync( string buildId, string modelType, BuildStage stage, - object? data = null, string? buildOptions = null, CancellationToken cancellationToken = default ) diff --git a/src/Machine/src/Serval.Machine.Shared/Services/NmtEngineService.cs b/src/Machine/src/Serval.Machine.Shared/Services/NmtEngineService.cs index 8108e1a98..d90917233 100644 --- a/src/Machine/src/Serval.Machine.Shared/Services/NmtEngineService.cs +++ b/src/Machine/src/Serval.Machine.Shared/Services/NmtEngineService.cs @@ -96,7 +96,7 @@ public async Task StartBuildAsync( engineId, buildId, BuildStage.Preprocess, - corpora, + new BuildData { ParallelCorpora = corpora }, options, cancellationToken ); diff --git a/src/Machine/src/Serval.Machine.Shared/Services/NmtHangfireBuildJobFactory.cs b/src/Machine/src/Serval.Machine.Shared/Services/NmtHangfireBuildJobFactory.cs index c9a4a5b07..4efcd5b1c 100644 --- a/src/Machine/src/Serval.Machine.Shared/Services/NmtHangfireBuildJobFactory.cs +++ b/src/Machine/src/Serval.Machine.Shared/Services/NmtHangfireBuildJobFactory.cs @@ -6,20 +6,20 @@ public class NmtHangfireBuildJobFactory : IHangfireBuildJobFactory { public EngineType EngineType => EngineType.Nmt; - public Job CreateJob(string engineId, string buildId, BuildStage stage, object? data, string? buildOptions) + public Job CreateJob(string engineId, string buildId, BuildStage stage, string? buildOptions) { return stage switch { - BuildStage.Preprocess => CreateJob< - TranslationEngine, - NmtPreprocessBuildJob, - IReadOnlyList - >(engineId, buildId, BuildJobQueues.Nmt, data, buildOptions), - BuildStage.Postprocess => CreateJob( + BuildStage.Preprocess => CreateJob( + engineId, + buildId, + BuildJobQueues.Nmt, + buildOptions + ), + BuildStage.Postprocess => CreateJob( engineId, buildId, BuildJobQueues.Nmt, - data, buildOptions ), _ => throw new ArgumentException("Unknown build stage.", nameof(stage)), diff --git a/src/Machine/src/Serval.Machine.Shared/Services/PostprocessBuildJob.cs b/src/Machine/src/Serval.Machine.Shared/Services/PostprocessBuildJob.cs index d263d82a3..2a980d50a 100644 --- a/src/Machine/src/Serval.Machine.Shared/Services/PostprocessBuildJob.cs +++ b/src/Machine/src/Serval.Machine.Shared/Services/PostprocessBuildJob.cs @@ -8,7 +8,7 @@ public abstract class PostprocessBuildJob( ILogger> logger, ISharedFileService sharedFileService, IOptionsMonitor options -) : HangfireBuildJob(platformService, engines, dataAccessContext, buildJobService, logger) +) : HangfireBuildJob(platformService, engines, dataAccessContext, buildJobService, logger) where TEngine : ITrainingEngine { protected ISharedFileService SharedFileService { get; } = sharedFileService; @@ -19,12 +19,7 @@ protected virtual Task SaveModelAsync(string engineId, string buildId) return Task.FromResult(0); } - protected override async Task CleanupAsync( - string engineId, - string buildId, - (int, double) data, - JobCompletionStatus completionStatus - ) + protected override async Task CleanupAsync(string engineId, string buildId, JobCompletionStatus completionStatus) { if (completionStatus is JobCompletionStatus.Restarting) return; diff --git a/src/Machine/src/Serval.Machine.Shared/Services/PreprocessBuildJob.cs b/src/Machine/src/Serval.Machine.Shared/Services/PreprocessBuildJob.cs index fce1c1e7d..7597cf705 100644 --- a/src/Machine/src/Serval.Machine.Shared/Services/PreprocessBuildJob.cs +++ b/src/Machine/src/Serval.Machine.Shared/Services/PreprocessBuildJob.cs @@ -9,14 +9,7 @@ public abstract class PreprocessBuildJob( ISharedFileService sharedFileService, IParallelCorpusService parallelCorpusService, IOptionsMonitor options -) - : HangfireBuildJob>( - platformService, - engines, - dataAccessContext, - buildJobService, - logger - ) +) : HangfireBuildJob(platformService, engines, dataAccessContext, buildJobService, logger) where TEngine : ITrainingEngine { // Using JavaScriptEncoder.Create(UnicodeRanges.All) to avoid escaping surrogate pairs @@ -36,7 +29,6 @@ IOptionsMonitor options protected override async Task DoWorkAsync( string engineId, string buildId, - IReadOnlyList data, string? buildOptions, CancellationToken cancellationToken ) @@ -45,6 +37,8 @@ CancellationToken cancellationToken if (engine is null) throw new OperationCanceledException($"Engine {engineId} does not exist. Build canceled."); + IReadOnlyList data = engine.CurrentBuild?.Data?.ParallelCorpora ?? []; + (int trainCount, int inferenceCount) = await WriteDataFilesAsync( buildId, data, @@ -112,12 +106,7 @@ CancellationToken cancellationToken CancellationToken cancellationToken ); - protected override async Task CleanupAsync( - string engineId, - string buildId, - IReadOnlyList data, - JobCompletionStatus completionStatus - ) + protected override async Task CleanupAsync(string engineId, string buildId, JobCompletionStatus completionStatus) { if (completionStatus is JobCompletionStatus.Canceled) { diff --git a/src/Machine/src/Serval.Machine.Shared/Services/SmtTransferClearMLBuildJobFactory.cs b/src/Machine/src/Serval.Machine.Shared/Services/SmtTransferClearMLBuildJobFactory.cs index fe97eaeb5..0552c093d 100644 --- a/src/Machine/src/Serval.Machine.Shared/Services/SmtTransferClearMLBuildJobFactory.cs +++ b/src/Machine/src/Serval.Machine.Shared/Services/SmtTransferClearMLBuildJobFactory.cs @@ -15,7 +15,6 @@ public async Task CreateJobScriptAsync( string buildId, string modelType, BuildStage stage, - object? data = null, string? buildOptions = null, CancellationToken cancellationToken = default ) diff --git a/src/Machine/src/Serval.Machine.Shared/Services/SmtTransferEngineService.cs b/src/Machine/src/Serval.Machine.Shared/Services/SmtTransferEngineService.cs index e142984f4..9d9b711a7 100644 --- a/src/Machine/src/Serval.Machine.Shared/Services/SmtTransferEngineService.cs +++ b/src/Machine/src/Serval.Machine.Shared/Services/SmtTransferEngineService.cs @@ -199,7 +199,7 @@ public async Task StartBuildAsync( engineId, buildId, BuildStage.Preprocess, - corpora, + new BuildData { ParallelCorpora = corpora }, options, cancellationToken ); diff --git a/src/Machine/src/Serval.Machine.Shared/Services/SmtTransferHangfireBuildJobFactory.cs b/src/Machine/src/Serval.Machine.Shared/Services/SmtTransferHangfireBuildJobFactory.cs index eb2880c90..1cc5ae4fc 100644 --- a/src/Machine/src/Serval.Machine.Shared/Services/SmtTransferHangfireBuildJobFactory.cs +++ b/src/Machine/src/Serval.Machine.Shared/Services/SmtTransferHangfireBuildJobFactory.cs @@ -6,20 +6,20 @@ public class SmtTransferHangfireBuildJobFactory : IHangfireBuildJobFactory { public EngineType EngineType => EngineType.SmtTransfer; - public Job CreateJob(string engineId, string buildId, BuildStage stage, object? data, string? buildOptions) + public Job CreateJob(string engineId, string buildId, BuildStage stage, string? buildOptions) { return stage switch { - BuildStage.Preprocess => CreateJob< - TranslationEngine, - SmtTransferPreprocessBuildJob, - IReadOnlyList - >(engineId, buildId, BuildJobQueues.SmtTransfer, data, buildOptions), - BuildStage.Postprocess => CreateJob( + BuildStage.Preprocess => CreateJob( + engineId, + buildId, + BuildJobQueues.SmtTransfer, + buildOptions + ), + BuildStage.Postprocess => CreateJob( engineId, buildId, BuildJobQueues.SmtTransfer, - data, buildOptions ), BuildStage.Train => CreateJob( diff --git a/src/Machine/src/Serval.Machine.Shared/Services/SmtTransferPreprocessBuildJob.cs b/src/Machine/src/Serval.Machine.Shared/Services/SmtTransferPreprocessBuildJob.cs index eef6282f9..9f10ec442 100644 --- a/src/Machine/src/Serval.Machine.Shared/Services/SmtTransferPreprocessBuildJob.cs +++ b/src/Machine/src/Serval.Machine.Shared/Services/SmtTransferPreprocessBuildJob.cs @@ -26,12 +26,7 @@ IOptionsMonitor options private readonly IDistributedReaderWriterLockFactory _lockFactory = lockFactory; private readonly IRepository _trainSegmentPairs = trainSegmentPairs; - protected override async Task InitializeAsync( - string engineId, - string buildId, - IReadOnlyList data, - CancellationToken cancellationToken - ) + protected override async Task InitializeAsync(string engineId, string buildId, CancellationToken cancellationToken) { IDistributedReaderWriterLock @lock = await _lockFactory.CreateAsync(engineId, cancellationToken); await @lock.WriterLockAsync( diff --git a/src/Machine/src/Serval.Machine.Shared/Services/SmtTransferTrainBuildJob.cs b/src/Machine/src/Serval.Machine.Shared/Services/SmtTransferTrainBuildJob.cs index 2047e1b84..41fdda975 100644 --- a/src/Machine/src/Serval.Machine.Shared/Services/SmtTransferTrainBuildJob.cs +++ b/src/Machine/src/Serval.Machine.Shared/Services/SmtTransferTrainBuildJob.cs @@ -34,7 +34,6 @@ ITransferEngineFactory transferEngineFactory protected override async Task DoWorkAsync( string engineId, string buildId, - object? data, string? buildOptions, CancellationToken cancellationToken ) @@ -56,7 +55,7 @@ CancellationToken cancellationToken // train SMT model string engineDir = Path.Combine(tempDir.Path, "engine"); - (int trainCorpusSize, double confidence) = await TrainAsync( + (int trainCount, double confidence) = await TrainAsync( buildId, engineDir, targetCorpus, @@ -74,20 +73,15 @@ CancellationToken cancellationToken engineId, buildId, BuildStage.Postprocess, - data: (trainCorpusSize, confidence), - buildOptions: buildOptions, - cancellationToken: cancellationToken + new BuildData { CorpusSize = trainCount, Confidence = confidence }, + buildOptions, + cancellationToken ); if (canceling) throw new OperationCanceledException(); } - protected override async Task CleanupAsync( - string engineId, - string buildId, - object? data, - JobCompletionStatus completionStatus - ) + protected override async Task CleanupAsync(string engineId, string buildId, JobCompletionStatus completionStatus) { if (completionStatus is JobCompletionStatus.Canceled) { diff --git a/src/Machine/src/Serval.Machine.Shared/Services/StatisticalEngineService.cs b/src/Machine/src/Serval.Machine.Shared/Services/StatisticalEngineService.cs index 7d04f5262..f808fe494 100644 --- a/src/Machine/src/Serval.Machine.Shared/Services/StatisticalEngineService.cs +++ b/src/Machine/src/Serval.Machine.Shared/Services/StatisticalEngineService.cs @@ -124,7 +124,7 @@ public async Task StartBuildAsync( engineId, buildId, BuildStage.Preprocess, - corpora, + new BuildData { ParallelCorpora = corpora }, options, cancellationToken ); diff --git a/src/Machine/src/Serval.Machine.Shared/Services/StatisticalHangfireBuildJobFactory.cs b/src/Machine/src/Serval.Machine.Shared/Services/StatisticalHangfireBuildJobFactory.cs index 712b79552..c6d2508ca 100644 --- a/src/Machine/src/Serval.Machine.Shared/Services/StatisticalHangfireBuildJobFactory.cs +++ b/src/Machine/src/Serval.Machine.Shared/Services/StatisticalHangfireBuildJobFactory.cs @@ -6,20 +6,20 @@ public class StatisticalHangfireBuildJobFactory : IHangfireBuildJobFactory { public EngineType EngineType => EngineType.Statistical; - public Job CreateJob(string engineId, string buildId, BuildStage stage, object? data, string? buildOptions) + public Job CreateJob(string engineId, string buildId, BuildStage stage, string? buildOptions) { return stage switch { - BuildStage.Preprocess => CreateJob< - WordAlignmentEngine, - WordAlignmentPreprocessBuildJob, - IReadOnlyList - >(engineId, buildId, BuildJobQueues.Statistical, data, buildOptions), - BuildStage.Postprocess => CreateJob( + BuildStage.Preprocess => CreateJob( + engineId, + buildId, + BuildJobQueues.Statistical, + buildOptions + ), + BuildStage.Postprocess => CreateJob( engineId, buildId, BuildJobQueues.Statistical, - data, buildOptions ), BuildStage.Train => CreateJob( diff --git a/src/Machine/src/Serval.Machine.Shared/Services/StatisticalPostprocessBuildJob.cs b/src/Machine/src/Serval.Machine.Shared/Services/StatisticalPostprocessBuildJob.cs index 50c2bdeb0..e65c89200 100644 --- a/src/Machine/src/Serval.Machine.Shared/Services/StatisticalPostprocessBuildJob.cs +++ b/src/Machine/src/Serval.Machine.Shared/Services/StatisticalPostprocessBuildJob.cs @@ -29,12 +29,17 @@ IOptionsMonitor engineOptions protected override async Task DoWorkAsync( string engineId, string buildId, - (int, double) data, string? buildOptions, CancellationToken cancellationToken ) { - (int corpusSize, double confidence) = data; + WordAlignmentEngine? engine = await Engines.GetAsync(e => e.EngineId == engineId, cancellationToken); + if (engine is null) + throw new OperationCanceledException($"Engine {engineId} does not exist. Build canceled."); + + BuildData? data = engine.CurrentBuild?.Data; + + (int corpusSize, double confidence) = (data?.CorpusSize ?? 0, data?.Confidence ?? 0); await using ( Stream wordAlignmentStream = await SharedFileService.OpenReadAsync( diff --git a/src/Machine/src/Serval.Machine.Shared/Services/StatisticalTrainBuildJob.cs b/src/Machine/src/Serval.Machine.Shared/Services/StatisticalTrainBuildJob.cs index 6a5c71bf7..704d7399b 100644 --- a/src/Machine/src/Serval.Machine.Shared/Services/StatisticalTrainBuildJob.cs +++ b/src/Machine/src/Serval.Machine.Shared/Services/StatisticalTrainBuildJob.cs @@ -30,7 +30,6 @@ IWordAlignmentModelFactory wordAlignmentModelFactory protected override async Task DoWorkAsync( string engineId, string buildId, - object? data, string? buildOptions, CancellationToken cancellationToken ) @@ -68,20 +67,15 @@ CancellationToken cancellationToken engineId, buildId, BuildStage.Postprocess, - buildOptions: buildOptions, - data: (trainCount, 0.0), - cancellationToken: cancellationToken + new BuildData { CorpusSize = trainCount, Confidence = 0.0 }, + buildOptions, + cancellationToken ); if (canceling) throw new OperationCanceledException(); } - protected override async Task CleanupAsync( - string engineId, - string buildId, - object? data, - JobCompletionStatus completionStatus - ) + protected override async Task CleanupAsync(string engineId, string buildId, JobCompletionStatus completionStatus) { if (completionStatus is JobCompletionStatus.Canceled) { diff --git a/src/Machine/src/Serval.Machine.Shared/Services/StatsiticalClearMLBuildJobFactory.cs b/src/Machine/src/Serval.Machine.Shared/Services/StatsiticalClearMLBuildJobFactory.cs index 1e104b2cd..e93958575 100644 --- a/src/Machine/src/Serval.Machine.Shared/Services/StatsiticalClearMLBuildJobFactory.cs +++ b/src/Machine/src/Serval.Machine.Shared/Services/StatsiticalClearMLBuildJobFactory.cs @@ -15,7 +15,6 @@ public async Task CreateJobScriptAsync( string buildId, string modelType, BuildStage stage, - object? data = null, string? buildOptions = null, CancellationToken cancellationToken = default ) diff --git a/src/Machine/src/Serval.Machine.Shared/Services/TranslationPostprocessBuildJob.cs b/src/Machine/src/Serval.Machine.Shared/Services/TranslationPostprocessBuildJob.cs index 54865f9a9..d43b3ad73 100644 --- a/src/Machine/src/Serval.Machine.Shared/Services/TranslationPostprocessBuildJob.cs +++ b/src/Machine/src/Serval.Machine.Shared/Services/TranslationPostprocessBuildJob.cs @@ -22,12 +22,17 @@ IOptionsMonitor options protected override async Task DoWorkAsync( string engineId, string buildId, - (int, double) data, string? buildOptions, CancellationToken cancellationToken ) { - (int corpusSize, double confidence) = data; + TranslationEngine? engine = await Engines.GetAsync(e => e.EngineId == engineId, cancellationToken); + if (engine is null) + throw new OperationCanceledException($"Engine {engineId} does not exist. Build canceled."); + + BuildData? data = engine.CurrentBuild?.Data; + + (int corpusSize, double confidence) = (data?.CorpusSize ?? 0, data?.Confidence ?? 0); await using ( Stream pretranslationsStream = await SharedFileService.OpenReadAsync( diff --git a/src/Machine/test/Serval.Machine.Shared.Tests/Services/ClearMLMonitorServiceTests.cs b/src/Machine/test/Serval.Machine.Shared.Tests/Services/ClearMLMonitorServiceTests.cs index 432b31e03..ff7d52141 100644 --- a/src/Machine/test/Serval.Machine.Shared.Tests/Services/ClearMLMonitorServiceTests.cs +++ b/src/Machine/test/Serval.Machine.Shared.Tests/Services/ClearMLMonitorServiceTests.cs @@ -274,7 +274,7 @@ public async Task MonitorClearMLTasksPerDomain_CompletedStatus_ProperlyHandlesCo engine.EngineId, engine.CurrentBuild.BuildId, BuildStage.Postprocess, - Arg.Is<(int, double)>(x => x.Item1 == ExpectedCorpusSize && x.Item2 == ExpectedConfidence), + Arg.Is(x => x.CorpusSize == ExpectedCorpusSize && x.Confidence == ExpectedConfidence), engine.CurrentBuild.Options, Arg.Any() ) @@ -301,7 +301,7 @@ await _translationBuildJobService engine.EngineId, engine.CurrentBuild.BuildId, BuildStage.Postprocess, - Arg.Is<(int, double)>(x => x.Item1 == ExpectedCorpusSize && x.Item2 == ExpectedConfidence), + Arg.Is(x => x.CorpusSize == ExpectedCorpusSize && x.Confidence == ExpectedConfidence), engine.CurrentBuild.Options, Arg.Any() ); diff --git a/src/Machine/test/Serval.Machine.Shared.Tests/Services/NmtEngineServiceTests.cs b/src/Machine/test/Serval.Machine.Shared.Tests/Services/NmtEngineServiceTests.cs index 92d9af766..b44b1772e 100644 --- a/src/Machine/test/Serval.Machine.Shared.Tests/Services/NmtEngineServiceTests.cs +++ b/src/Machine/test/Serval.Machine.Shared.Tests/Services/NmtEngineServiceTests.cs @@ -297,7 +297,7 @@ await BuildJobService.StartBuildJobAsync( "engine1", "build1", BuildStage.Postprocess, - (0, 0.0) + new BuildData { CorpusSize = 0, Confidence = 0.0 } ); } diff --git a/src/Machine/test/Serval.Machine.Shared.Tests/Services/PreprocessBuildJobTests.cs b/src/Machine/test/Serval.Machine.Shared.Tests/Services/PreprocessBuildJobTests.cs index 1e9901ba0..30c1bb68a 100644 --- a/src/Machine/test/Serval.Machine.Shared.Tests/Services/PreprocessBuildJobTests.cs +++ b/src/Machine/test/Serval.Machine.Shared.Tests/Services/PreprocessBuildJobTests.cs @@ -4,14 +4,17 @@ namespace Serval.Machine.Shared.Services; public class PreprocessBuildJobTests { [Test] - public void RunAsync_NothingToInference() + public async Task RunAsync_NothingToInference() { TestEnvironment env = new(); ParallelCorpusContract corpus1 = TestEnvironment.TextFileCorpus(trainOnTextIds: null, inferenceTextIds: []); - + await env.Engines.UpdateAsync( + e => e.EngineId == "engine1" && e.CurrentBuild != null, + u => u.Set(e => e.CurrentBuild!.Data, new BuildData { ParallelCorpora = [corpus1] }) + ); Assert.ThrowsAsync(async () => { - await env.RunBuildJobAsync(corpus1); + await env.RunBuildJobAsync(); }); } @@ -41,6 +44,10 @@ public async Task RunAsync_BuildWarnings() }, ], }; + await env.Engines.UpdateAsync( + e => e.EngineId == "engine4" && e.CurrentBuild != null, + u => u.Set(e => e.CurrentBuild!.Data, new BuildData { ParallelCorpora = [corpus1] }) + ); env.ParallelCorpusService.AnalyzeUsfmVersification(Arg.Any>()) .Returns([ ( @@ -65,24 +72,27 @@ public async Task RunAsync_BuildWarnings() ), ]); - await env.RunBuildJobAsync(corpus1, engineId: "engine4"); + await env.RunBuildJobAsync(engineId: "engine4"); Assert.That(env.ExecutionData.Warnings, Has.Count.EqualTo(2)); env.BuildJobOptions.CurrentValue.Returns(new BuildJobOptions() { MaxWarnings = 1 }); - await env.RunBuildJobAsync(corpus1, engineId: "engine4"); + await env.RunBuildJobAsync(engineId: "engine4"); // Two warnings after truncation + one warning mentioning that warnings were truncated Assert.That(env.ExecutionData.Warnings, Has.Count.EqualTo(2)); } [Test] - public void RunAsync_UnknownLanguageTagsNoData() + public async Task RunAsync_UnknownLanguageTagsNoData() { TestEnvironment env = new(); ParallelCorpusContract corpus1 = TestEnvironment.TextFileCorpus(sourceLanguage: "xxx", targetLanguage: "zzz"); - + await env.Engines.UpdateAsync( + e => e.EngineId == "engine2" && e.CurrentBuild != null, + u => u.Set(e => e.CurrentBuild!.Data, new BuildData { ParallelCorpora = [corpus1] }) + ); Assert.ThrowsAsync(async () => { - await env.RunBuildJobAsync(corpus1, engineId: "engine2"); + await env.RunBuildJobAsync(engineId: "engine2"); }); } @@ -91,8 +101,12 @@ public async Task RunAsync_UnknownLanguageTagsNoDataSmtTransfer() { TestEnvironment env = new(); ParallelCorpusContract corpus1 = TestEnvironment.TextFileCorpus(sourceLanguage: "xxx", targetLanguage: "zzz"); + await env.Engines.UpdateAsync( + e => e.EngineId == "engine3" && e.CurrentBuild != null, + u => u.Set(e => e.CurrentBuild!.Data, new BuildData { ParallelCorpora = [corpus1] }) + ); - await env.RunBuildJobAsync(corpus1, engineId: "engine3", engineType: EngineType.SmtTransfer); + await env.RunBuildJobAsync(engineId: "engine3", engineType: EngineType.SmtTransfer); } private class TestEnvironment @@ -381,30 +395,13 @@ public static ParallelCorpusContract TextFileCorpus(string sourceLanguage, strin } public Task RunBuildJobAsync( - ParallelCorpusContract corpus, - bool useKeyTerms = true, - string engineId = "engine1", - EngineType engineType = EngineType.Nmt - ) - { - return RunBuildJobAsync([corpus], useKeyTerms, engineId, engineType); - } - - public Task RunBuildJobAsync( - IEnumerable corpora, bool useKeyTerms = true, string engineId = "engine1", EngineType engineType = EngineType.Nmt ) { return GetBuildJob(engineType) - .RunAsync( - engineId, - "build1", - corpora.ToList(), - useKeyTerms ? null : "{\"use_key_terms\":false}", - default - ); + .RunAsync(engineId, "build1", useKeyTerms ? null : "{\"use_key_terms\":false}", default); } public static CorpusFileContract ParatextFile(string name) diff --git a/src/Machine/test/Serval.Machine.Shared.Tests/Services/SmtTransferEngineServiceTests.cs b/src/Machine/test/Serval.Machine.Shared.Tests/Services/SmtTransferEngineServiceTests.cs index 168320816..3a43dd90e 100644 --- a/src/Machine/test/Serval.Machine.Shared.Tests/Services/SmtTransferEngineServiceTests.cs +++ b/src/Machine/test/Serval.Machine.Shared.Tests/Services/SmtTransferEngineServiceTests.cs @@ -689,7 +689,7 @@ await BuildJobService.StartBuildJobAsync( EngineId1, BuildId1, BuildStage.Postprocess, - data: (0, 0.0) + new BuildData { CorpusSize = 0, Confidence = 0.0 } ); } catch (OperationCanceledException) diff --git a/src/Machine/test/Serval.Machine.Shared.Tests/Services/StatisticalEngineServiceTests.cs b/src/Machine/test/Serval.Machine.Shared.Tests/Services/StatisticalEngineServiceTests.cs index f66587d8c..db38ce3aa 100644 --- a/src/Machine/test/Serval.Machine.Shared.Tests/Services/StatisticalEngineServiceTests.cs +++ b/src/Machine/test/Serval.Machine.Shared.Tests/Services/StatisticalEngineServiceTests.cs @@ -435,7 +435,7 @@ await BuildJobService.StartBuildJobAsync( EngineId1, BuildId1, BuildStage.Postprocess, - data: (0, 0.0) + new BuildData { CorpusSize = 0, Confidence = 0.0 } ); } catch (OperationCanceledException) From 510289314c33349679e895e4a1f7e0213e14c9ed Mon Sep 17 00:00:00 2001 From: Enkidu93 Date: Thu, 14 May 2026 17:45:45 -0400 Subject: [PATCH 6/9] Remove unnecessary transaction --- .../Services/BuildJobRunnerManager.cs | 34 ++++++++----------- 1 file changed, 14 insertions(+), 20 deletions(-) diff --git a/src/Machine/src/Serval.Machine.Shared/Services/BuildJobRunnerManager.cs b/src/Machine/src/Serval.Machine.Shared/Services/BuildJobRunnerManager.cs index b1fd8bf20..30979dec4 100644 --- a/src/Machine/src/Serval.Machine.Shared/Services/BuildJobRunnerManager.cs +++ b/src/Machine/src/Serval.Machine.Shared/Services/BuildJobRunnerManager.cs @@ -53,27 +53,21 @@ TEngine engine in await engines.GetAllAsync( string? jobId = null; try { - await dataAccessContext.WithTransactionAsync( - async (ct) => - { - await engines.UpdateAsync( - e => e.EngineId == engine.Id, - u => u.Set(e => e.CurrentBuild!.JobState, BuildJobState.Pending), - cancellationToken: ct - ); - jobId = await runners[build.BuildJobRunner] - .CreateJobAsync( - engine.Type, - engine.EngineId, - build.BuildId, - build.Stage, - build.Options, - ct - ); - await runners[build.BuildJobRunner].EnqueueJobAsync(jobId, engine.Type, cancellationToken); - }, - cancellationToken: CancellationToken.None + await engines.UpdateAsync( + e => e.EngineId == engine.Id, + u => u.Set(e => e.CurrentBuild!.JobState, BuildJobState.Pending), + cancellationToken: cancellationToken ); + jobId = await runners[build.BuildJobRunner] + .CreateJobAsync( + engine.Type, + engine.EngineId, + build.BuildId, + build.Stage, + build.Options, + cancellationToken + ); + await runners[build.BuildJobRunner].EnqueueJobAsync(jobId, engine.Type, cancellationToken); } catch (Exception e) { From 290e74b1ae420f8b47396cee2a0cd681de675588 Mon Sep 17 00:00:00 2001 From: Enkidu93 Date: Thu, 14 May 2026 18:01:05 -0400 Subject: [PATCH 7/9] Fix jobId setting --- .../Services/BuildJobRunnerManager.cs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/Machine/src/Serval.Machine.Shared/Services/BuildJobRunnerManager.cs b/src/Machine/src/Serval.Machine.Shared/Services/BuildJobRunnerManager.cs index 30979dec4..74012a218 100644 --- a/src/Machine/src/Serval.Machine.Shared/Services/BuildJobRunnerManager.cs +++ b/src/Machine/src/Serval.Machine.Shared/Services/BuildJobRunnerManager.cs @@ -46,10 +46,6 @@ TEngine engine in await engines.GetAllAsync( ) { Build build = engine.CurrentBuild!; - if (!string.IsNullOrEmpty(build.JobId)) - //TODO - should these be cleaned up? - continue; - string? jobId = null; try { @@ -67,6 +63,11 @@ await engines.UpdateAsync( build.Options, cancellationToken ); + await engines.UpdateAsync( + e => e.EngineId == engine.Id, + u => u.Set(e => e.CurrentBuild!.JobId, jobId), + cancellationToken: cancellationToken + ); await runners[build.BuildJobRunner].EnqueueJobAsync(jobId, engine.Type, cancellationToken); } catch (Exception e) From 7fa3654db5beaab435dfdd86c1fa483c82b75a53 Mon Sep 17 00:00:00 2001 From: Enkidu93 Date: Fri, 15 May 2026 09:34:09 -0400 Subject: [PATCH 8/9] Fix engine.EngineId not engine.Id --- .../Services/BuildJobRunnerManager.cs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/Machine/src/Serval.Machine.Shared/Services/BuildJobRunnerManager.cs b/src/Machine/src/Serval.Machine.Shared/Services/BuildJobRunnerManager.cs index 74012a218..40708dbf3 100644 --- a/src/Machine/src/Serval.Machine.Shared/Services/BuildJobRunnerManager.cs +++ b/src/Machine/src/Serval.Machine.Shared/Services/BuildJobRunnerManager.cs @@ -46,11 +46,14 @@ TEngine engine in await engines.GetAllAsync( ) { Build build = engine.CurrentBuild!; + if (!string.IsNullOrEmpty(build.JobId)) + //TODO - should these be cleaned up? + continue; string? jobId = null; try { await engines.UpdateAsync( - e => e.EngineId == engine.Id, + e => e.EngineId == engine.EngineId, u => u.Set(e => e.CurrentBuild!.JobState, BuildJobState.Pending), cancellationToken: cancellationToken ); @@ -64,7 +67,7 @@ await engines.UpdateAsync( cancellationToken ); await engines.UpdateAsync( - e => e.EngineId == engine.Id, + e => e.EngineId == engine.EngineId, u => u.Set(e => e.CurrentBuild!.JobId, jobId), cancellationToken: cancellationToken ); @@ -79,7 +82,7 @@ await dataAccessContext.WithTransactionAsync( await platformService.BuildFaultedAsync(build.BuildId, e.Message, CancellationToken.None); await engines.UpdateAsync( e => - e.EngineId == engine.Id + e.EngineId == engine.EngineId && e.CurrentBuild != null && e.CurrentBuild.BuildId == build.BuildId, u => @@ -152,7 +155,7 @@ TEngine engine in await engines.GetAllAsync( IBuildJobRunner runner = runners[runnerType]; try { - await runner.DeleteEngineAsync(engine.Id, cancellationToken); + await runner.DeleteEngineAsync(engine.EngineId, cancellationToken); } catch (Exception e) { From def8412c1bdcb3099c676d691cb30a83b73f0deb Mon Sep 17 00:00:00 2001 From: Enkidu93 Date: Fri, 15 May 2026 09:40:45 -0400 Subject: [PATCH 9/9] Fix TODOs --- .../Serval.Machine.Shared/Services/BuildJobRunnerManager.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/Machine/src/Serval.Machine.Shared/Services/BuildJobRunnerManager.cs b/src/Machine/src/Serval.Machine.Shared/Services/BuildJobRunnerManager.cs index 40708dbf3..c4a41f992 100644 --- a/src/Machine/src/Serval.Machine.Shared/Services/BuildJobRunnerManager.cs +++ b/src/Machine/src/Serval.Machine.Shared/Services/BuildJobRunnerManager.cs @@ -47,7 +47,7 @@ TEngine engine in await engines.GetAllAsync( { Build build = engine.CurrentBuild!; if (!string.IsNullOrEmpty(build.JobId)) - //TODO - should these be cleaned up? + //TODO - how should these be cleaned up if somehow they existed? Just marked as failed? continue; string? jobId = null; try @@ -116,7 +116,7 @@ TEngine engine in await engines.GetAllAsync( { Build build = engine.CurrentBuild!; if (string.IsNullOrEmpty(build.JobId)) - //TODO - should these be cleaned up? + //TODO - should these be cleaned up? I think we can just do nothing since the only responsibility of the this function is to stop the job and the job is already 'stopped', right? continue; try @@ -142,7 +142,7 @@ private static async Task DeleteDeletingEngines( CancellationToken cancellationToken ) { - //TODO what about non-building engines? For ClearML this would still be needed + //TODO what about non-building engines? For ClearML this would still be needed. This may just need to be a new flag on the engine itself instead of a build state. foreach ( TEngine engine in await engines.GetAllAsync( e => e.CurrentBuild != null && e.CurrentBuild.JobState == BuildJobState.Deleting,