diff --git a/.github/workflows/ci-e2e.yml b/.github/workflows/ci-e2e.yml index 9081eef0c..3d47b8a20 100644 --- a/.github/workflows/ci-e2e.yml +++ b/.github/workflows/ci-e2e.yml @@ -28,20 +28,6 @@ jobs: steps: - uses: actions/checkout@v6 - # get version of machine.py - MACHINE_PY_IMAGE will force the docker compose to use the proper version of machine.py - - name: Install regctl - uses: iarekylew00t/regctl-installer@v4.0.8 - - - name: Set proper version of Machine.py - run: | - export MACHINE_PY_IMAGE=ghcr.io/sillsdev/machine.py:$(regctl image config ghcr.io/sillsdev/machine.py | jq -r ".config.Labels[\"org.opencontainers.image.version\"]") && \ - echo "MACHINE_PY_IMAGE=$MACHINE_PY_IMAGE" >> $GITHUB_ENV && \ - echo "MACHINE_PY_CPU_IMAGE=$MACHINE_PY_IMAGE.cpu_only" >> $GITHUB_ENV - - - name: Confirm proper version of Machine.py - run: | - echo $MACHINE_PY_IMAGE $MACHINE_PY_CPU_IMAGE - - name: Setup .NET uses: actions/setup-dotnet@v5 with: diff --git a/docker-compose.yml b/docker-compose.yml index 3b3f11780..ce6a2a8d9 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -20,11 +20,11 @@ services: - "ClearML__AccessKey=${ClearML_AccessKey:?access key needed}" - "ClearML__SecretKey=${ClearML_SecretKey:?secret key needed}" - BuildJob__ClearML__0__Queue=${CLEARML_GPU_QUEUE:-lambert_24gb} - - BuildJob__ClearML__0__DockerImage=${MACHINE_PY_IMAGE:-ghcr.io/sillsdev/machine.py:latest} + - BuildJob__ClearML__0__DockerImage=${MACHINE_PY_IMAGE:-ghcr.io/sillsdev/machine.py:1.12.0} - BuildJob__ClearML__1__Queue=${CLEARML_CPU_QUEUE:-lambert_24gb.cpu_only} - - BuildJob__ClearML__1__DockerImage=${MACHINE_PY_CPU_IMAGE:-ghcr.io/sillsdev/machine.py:latest.cpu_only} + - BuildJob__ClearML__1__DockerImage=${MACHINE_PY_CPU_IMAGE:-ghcr.io/sillsdev/machine.py:1.12.0.cpu_only} - BuildJob__ClearML__2__Queue=${CLEARML_CPU_QUEUE:-lambert_24gb.cpu_only} - - BuildJob__ClearML__2__DockerImage=${MACHINE_PY_CPU_IMAGE:-ghcr.io/sillsdev/machine.py:latest.cpu_only} + - BuildJob__ClearML__2__DockerImage=${MACHINE_PY_CPU_IMAGE:-ghcr.io/sillsdev/machine.py:1.12.0.cpu_only} - SharedFile__Uri=s3://silnlp/docker-compose/ - "SharedFile__S3AccessKeyId=${AWS_ACCESS_KEY_ID:?access key needed}" - "SharedFile__S3SecretAccessKey=${AWS_SECRET_ACCESS_KEY:?secret key needed}" @@ -42,7 +42,7 @@ services: - ~/.nuget/packages:/root/.nuget/packages:ro - /var/lib/machine:/var/lib/machine - /var/lib/serval:/var/lib/serval - working_dir: '/app/src/Serval/src/Serval.ApiServer' + working_dir: "/app/src/Serval/src/Serval.ApiServer" entrypoint: - dotnet - run @@ -63,7 +63,7 @@ services: # then hang forever so the container does not exit command: [ - '/bin/sh', - '-c', - 'mongod --quiet --replSet myRS --bind_ip 0.0.0.0 & sleep 2s; mongosh --host localhost:27017 --eval '' config = { "_id" : "myRS", "members" : [{"_id" : 0,"host" : "mongo:27017"}] }; rs.initiate(config, { force: true }); '' ; sleep infinity' - ] \ No newline at end of file + "/bin/sh", + "-c", + 'mongod --quiet --replSet myRS --bind_ip 0.0.0.0 & sleep 2s; mongosh --host localhost:27017 --eval '' config = { "_id" : "myRS", "members" : [{"_id" : 0,"host" : "mongo:27017"}] }; rs.initiate(config, { force: true }); '' ; sleep infinity', + ] diff --git a/src/Echo/src/EchoEngine/TranslationEngineService.cs b/src/Echo/src/EchoEngine/TranslationEngineService.cs index e07fd36d5..fb9e23703 100644 --- a/src/Echo/src/EchoEngine/TranslationEngineService.cs +++ b/src/Echo/src/EchoEngine/TranslationEngineService.cs @@ -180,6 +180,16 @@ await _taskQueue.QueueBackgroundWorkItemAsync( try { + //Wait for build to exist in the database before starting the build. + TimeSpan timeout = TimeSpan.FromSeconds(60); + DateTime start = DateTime.UtcNow; + while ( + (DateTime.UtcNow - start < timeout) + && !await platform.BuildExistsAsync(buildId, linkedCts.Token) + ) + { + await Task.Delay(TimeSpan.FromMilliseconds(10), linkedCts.Token); + } await platform.BuildStartedAsync(buildId, linkedCts.Token); int trainCount = 0; diff --git a/src/Echo/src/EchoEngine/WordAlignmentEngineService.cs b/src/Echo/src/EchoEngine/WordAlignmentEngineService.cs index 491fdec5c..de363cbf0 100644 --- a/src/Echo/src/EchoEngine/WordAlignmentEngineService.cs +++ b/src/Echo/src/EchoEngine/WordAlignmentEngineService.cs @@ -94,6 +94,16 @@ await _taskQueue.QueueBackgroundWorkItemAsync( try { + //Wait for build to exist in the database before starting the build. + TimeSpan timeout = TimeSpan.FromSeconds(60); + DateTime start = DateTime.UtcNow; + while ( + (DateTime.UtcNow - start < timeout) + && !await platform.BuildExistsAsync(buildId, linkedCts.Token) + ) + { + await Task.Delay(TimeSpan.FromMilliseconds(10), linkedCts.Token); + } await platform.BuildStartedAsync(buildId, linkedCts.Token); int trainCount = 0; diff --git a/src/Machine/src/Serval.Machine.Shared/Configuration/IServalConfiguratorExtensions.cs b/src/Machine/src/Serval.Machine.Shared/Configuration/IServalConfiguratorExtensions.cs index 726824792..4cfed5a59 100644 --- a/src/Machine/src/Serval.Machine.Shared/Configuration/IServalConfiguratorExtensions.cs +++ b/src/Machine/src/Serval.Machine.Shared/Configuration/IServalConfiguratorExtensions.cs @@ -61,11 +61,9 @@ private static IServalConfigurator AddTranslationEngines(this IServalConfigurato configurator.Services.AddSingleton(); configurator.Services.AddSingleton(); configurator.AddTranslationEngine(EngineType.SmtTransfer.ToString()); - configurator.JobQueues.Add(BuildJobQueues.SmtTransfer); // NMT Engine configurator.AddTranslationEngine(EngineType.Nmt.ToString()); - configurator.JobQueues.Add(BuildJobQueues.Nmt); return configurator; } @@ -84,7 +82,6 @@ private static IServalConfigurator AddWordAlignmentEngines(this IServalConfigura configurator.Services.AddSingleton(); configurator.AddWordAlignmentEngine(EngineType.Statistical.ToString()); configurator.Services.AddHostedService(); - configurator.JobQueues.Add(BuildJobQueues.Statistical); return configurator; } @@ -206,10 +203,12 @@ private static IServalConfigurator AddBuildJobService(this IServalConfigurator c configurator.Services.AddSingleton(x => x.GetRequiredService()); configurator.Services.AddHostedService(p => p.GetRequiredService()); - configurator.Services.AddScoped(); - configurator.Services.AddScoped(); - configurator.Services.AddScoped(); - configurator.Services.AddScoped(); + configurator.Services.AddSingleton(); + configurator.Services.AddSingleton(sp => sp.GetRequiredService()); + configurator.Services.AddHostedService(sp => sp.GetRequiredService()); + configurator.Services.AddSingleton(); + configurator.Services.AddSingleton(); + configurator.Services.AddSingleton(); var smtTransferEngineOptions = new SmtTransferEngineOptions(); configurator.Configuration.GetSection(SmtTransferEngineOptions.Key).Bind(smtTransferEngineOptions); diff --git a/src/Machine/src/Serval.Machine.Shared/Models/Build.cs b/src/Machine/src/Serval.Machine.Shared/Models/Build.cs index 38a3bc739..4a7df99bf 100644 --- a/src/Machine/src/Serval.Machine.Shared/Models/Build.cs +++ b/src/Machine/src/Serval.Machine.Shared/Models/Build.cs @@ -10,8 +10,8 @@ public enum BuildJobState public enum BuildJobRunnerType { - Hangfire, ClearML, + Local, } public enum BuildStage @@ -28,6 +28,8 @@ public record Build public required string JobId { get; init; } public required BuildJobRunnerType BuildJobRunner { get; init; } public required BuildStage Stage { get; init; } + public DateTimeOffset QueuedAt { get; init; } public string? Options { get; set; } + public string? JobData { get; init; } public required BuildExecutionData ExecutionData { get; init; } } diff --git a/src/Machine/src/Serval.Machine.Shared/Serval.Machine.Shared.csproj b/src/Machine/src/Serval.Machine.Shared/Serval.Machine.Shared.csproj index 9d0f74bad..1ba9e969d 100644 --- a/src/Machine/src/Serval.Machine.Shared/Serval.Machine.Shared.csproj +++ b/src/Machine/src/Serval.Machine.Shared/Serval.Machine.Shared.csproj @@ -34,12 +34,11 @@ - - - - + + + diff --git a/src/Machine/src/Serval.Machine.Shared/Services/HangfireBuildJob.cs b/src/Machine/src/Serval.Machine.Shared/Services/BuildJob.cs similarity index 83% rename from src/Machine/src/Serval.Machine.Shared/Services/HangfireBuildJob.cs rename to src/Machine/src/Serval.Machine.Shared/Services/BuildJob.cs index 0b40ee6e5..f2f877db4 100644 --- a/src/Machine/src/Serval.Machine.Shared/Services/HangfireBuildJob.cs +++ b/src/Machine/src/Serval.Machine.Shared/Services/BuildJob.cs @@ -1,15 +1,14 @@ -namespace Serval.Machine.Shared.Services; +namespace Serval.Machine.Shared.Services; -public abstract class HangfireBuildJob( +public abstract class BuildJob( IPlatformService platformService, IRepository engines, IDataAccessContext dataAccessContext, IBuildJobService buildJobService, - ILogger> logger -) : HangfireBuildJob(platformService, engines, dataAccessContext, buildJobService, logger) + ILogger> logger +) : BuildJob(platformService, engines, dataAccessContext, buildJobService, logger) where TEngine : ITrainingEngine { - [AutomaticRetry(Attempts = 0)] public virtual Task RunAsync( string engineId, string buildId, @@ -21,12 +20,12 @@ CancellationToken cancellationToken } } -public abstract class HangfireBuildJob( +public abstract class BuildJob( IPlatformService platformService, IRepository engines, IDataAccessContext dataAccessContext, IBuildJobService buildJobService, - ILogger> logger + ILogger> logger ) where TEngine : ITrainingEngine { @@ -34,9 +33,8 @@ ILogger> logger protected IRepository Engines { get; } = engines; protected IDataAccessContext DataAccessContext { get; } = dataAccessContext; protected IBuildJobService BuildJobService { get; } = buildJobService; - protected ILogger> Logger { get; } = logger; + protected ILogger> Logger { get; } = logger; - [AutomaticRetry(Attempts = 0)] public virtual async Task RunAsync( string engineId, string buildId, @@ -59,8 +57,7 @@ CancellationToken cancellationToken } catch (OperationCanceledException e) { - // Log the full exception for debugging purposes - Logger.LogInformation(e, "Build Hangfire job canceled ({0})", buildId); + Logger.LogInformation(e, "Build job canceled ({0})", buildId); // Check if the cancellation was initiated by an API call or a shutdown. TEngine? engine = await Engines.GetAsync( @@ -87,8 +84,7 @@ await BuildJobService.BuildJobFinishedAsync( } else if (engine is not null) { - // the build was canceled, because of a server shutdown - // switch state back to pending + // the build was canceled because of a server shutdown — switch state back to pending completionStatus = JobCompletionStatus.Restarting; await DataAccessContext.WithTransactionAsync( async (ct) => @@ -128,7 +124,7 @@ await BuildJobService.BuildJobFinishedAsync( } finally { - await CleanupAsync(engineId, buildId, data, completionStatus); + await CleanupAsync(engineId, buildId, completionStatus); } } @@ -150,12 +146,7 @@ protected abstract Task DoWorkAsync( CancellationToken cancellationToken ); - protected virtual Task CleanupAsync( - string engineId, - string buildId, - TData data, - JobCompletionStatus completionStatus - ) + protected virtual Task CleanupAsync(string engineId, string buildId, JobCompletionStatus completionStatus) { return Task.CompletedTask; } diff --git a/src/Machine/src/Serval.Machine.Shared/Services/BuildJobQueues.cs b/src/Machine/src/Serval.Machine.Shared/Services/BuildJobQueues.cs deleted file mode 100644 index dd66f6e81..000000000 --- a/src/Machine/src/Serval.Machine.Shared/Services/BuildJobQueues.cs +++ /dev/null @@ -1,8 +0,0 @@ -namespace Serval.Machine.Shared.Services; - -public static class BuildJobQueues -{ - public const string Nmt = "nmt"; - public const string SmtTransfer = "smt_transfer"; - public const string Statistical = "statistical"; -} diff --git a/src/Machine/src/Serval.Machine.Shared/Services/BuildJobService.cs b/src/Machine/src/Serval.Machine.Shared/Services/BuildJobService.cs index b02b5aeb7..c375a5556 100644 --- a/src/Machine/src/Serval.Machine.Shared/Services/BuildJobService.cs +++ b/src/Machine/src/Serval.Machine.Shared/Services/BuildJobService.cs @@ -70,7 +70,7 @@ public async Task StartBuildJobAsync( ) { IBuildJobRunner runner = Runners[runnerType]; - string jobId = await runner.CreateJobAsync( + (string jobId, string? jobData) = await runner.CreateJobAsync( engineType, engineId, buildId, @@ -102,7 +102,9 @@ public async Task StartBuildJobAsync( BuildJobRunner = runner.Type, Stage = stage, JobState = BuildJobState.Pending, + QueuedAt = DateTimeOffset.UtcNow, Options = buildOptions, + JobData = jobData, ExecutionData = new BuildExecutionData(), } ), diff --git a/src/Machine/src/Serval.Machine.Shared/Services/BuildProgress.cs b/src/Machine/src/Serval.Machine.Shared/Services/BuildProgress.cs deleted file mode 100644 index 88422c6cf..000000000 --- a/src/Machine/src/Serval.Machine.Shared/Services/BuildProgress.cs +++ /dev/null @@ -1,25 +0,0 @@ -namespace Serval.Machine.Shared.Services; - -public class BuildProgress(IPlatformService platformService, string buildId) : IProgress -{ - private readonly IPlatformService _platformService = platformService; - private readonly string _buildId = buildId; - private ProgressStatus _prevStatus; - - private DateTime _lastReportTime = DateTime.Now; - - private const float ThrottleTimeSeconds = 1; - - public void Report(ProgressStatus value) - { - if (_prevStatus.Equals(value)) - return; - - if (DateTime.Now < _lastReportTime.AddSeconds(ThrottleTimeSeconds)) - return; - - _lastReportTime = DateTime.Now; - _platformService.UpdateBuildStatusAsync(_buildId, value); - _prevStatus = value; - } -} diff --git a/src/Machine/src/Serval.Machine.Shared/Services/ClearMLBuildJobRunner.cs b/src/Machine/src/Serval.Machine.Shared/Services/ClearMLBuildJobRunner.cs index e47ac4b67..32614ff8a 100644 --- a/src/Machine/src/Serval.Machine.Shared/Services/ClearMLBuildJobRunner.cs +++ b/src/Machine/src/Serval.Machine.Shared/Services/ClearMLBuildJobRunner.cs @@ -32,7 +32,7 @@ public async Task DeleteEngineAsync(string engineId, CancellationToken cancellat await _clearMLService.DeleteProjectAsync(projectId, cancellationToken); } - public async Task CreateJobAsync( + public async Task<(string JobId, string? JobData)> CreateJobAsync( EngineType engineType, string engineId, string buildId, @@ -47,7 +47,7 @@ public async Task CreateJobAsync( ClearMLTask? task = await _clearMLService.GetTaskByNameAsync(buildId, cancellationToken); if (task is not null) - return task.Id; + return (task.Id, null); IClearMLBuildJobFactory buildJobFactory = _buildJobFactories[engineType]; string script = await buildJobFactory.CreateJobScriptAsync( @@ -55,17 +55,17 @@ public async Task CreateJobAsync( buildId, _options[engineType].ModelType, stage, - data, buildOptions, cancellationToken ); - return await _clearMLService.CreateTaskAsync( + string jobId = await _clearMLService.CreateTaskAsync( buildId, projectId, script, _options[engineType].DockerImage, cancellationToken ); + return (jobId, null); } public Task DeleteJobAsync(string jobId, CancellationToken cancellationToken = default) diff --git a/src/Machine/src/Serval.Machine.Shared/Services/ClearMLMonitorService.cs b/src/Machine/src/Serval.Machine.Shared/Services/ClearMLMonitorService.cs index 90323234d..46106c5e7 100644 --- a/src/Machine/src/Serval.Machine.Shared/Services/ClearMLMonitorService.cs +++ b/src/Machine/src/Serval.Machine.Shared/Services/ClearMLMonitorService.cs @@ -300,7 +300,7 @@ CancellationToken cancellationToken try { return await buildJobService.StartBuildJobAsync( - BuildJobRunnerType.Hangfire, + BuildJobRunnerType.Local, engineType, engineId, buildId, diff --git a/src/Machine/src/Serval.Machine.Shared/Services/HangfireBuildJobRunner.cs b/src/Machine/src/Serval.Machine.Shared/Services/HangfireBuildJobRunner.cs deleted file mode 100644 index fc91536a6..000000000 --- a/src/Machine/src/Serval.Machine.Shared/Services/HangfireBuildJobRunner.cs +++ /dev/null @@ -1,87 +0,0 @@ -namespace Serval.Machine.Shared.Services; - -public class HangfireBuildJobRunner( - IBackgroundJobClient jobClient, - IEnumerable buildJobFactories -) : IBuildJobRunner -{ - public static Job CreateJob( - string engineId, - string buildId, - string queue, - object? data, - string? buildOptions - ) - where TEngine : ITrainingEngine - where TJob : HangfireBuildJob - { - ArgumentNullException.ThrowIfNull(data); - // Token "None" is used here because hangfire injects the proper cancellation token - return Job.FromExpression( - j => j.RunAsync(engineId, buildId, (TData)data, buildOptions, CancellationToken.None), - queue - ); - } - - public static Job CreateJob(string engineId, string buildId, string queue, string? buildOptions) - where TEngine : ITrainingEngine - where TJob : HangfireBuildJob - { - // Token "None" is used here because hangfire injects the proper cancellation token - return Job.FromExpression( - j => j.RunAsync(engineId, buildId, buildOptions, CancellationToken.None), - queue - ); - } - - private readonly IBackgroundJobClient _jobClient = jobClient; - private readonly Dictionary _buildJobFactories = - buildJobFactories.ToDictionary(f => f.EngineType); - - public BuildJobRunnerType Type => BuildJobRunnerType.Hangfire; - - public Task CreateEngineAsync(string engineId, string? name = null, CancellationToken cancellationToken = default) - { - return Task.CompletedTask; - } - - public Task DeleteEngineAsync(string engineId, CancellationToken cancellationToken = default) - { - return Task.CompletedTask; - } - - public Task CreateJobAsync( - EngineType engineType, - string engineId, - string buildId, - BuildStage stage, - object? data = null, - string? buildOptions = null, - CancellationToken cancellationToken = default - ) - { - IHangfireBuildJobFactory buildJobFactory = _buildJobFactories[engineType]; - Job job = buildJobFactory.CreateJob(engineId, buildId, stage, data, buildOptions); - return Task.FromResult(_jobClient.Create(job, new ScheduledState(TimeSpan.FromDays(10000)))); - } - - public Task DeleteJobAsync(string jobId, CancellationToken cancellationToken = default) - { - return Task.FromResult(_jobClient.Delete(jobId)); - } - - public Task EnqueueJobAsync( - string jobId, - EngineType engineType, - CancellationToken cancellationToken = default - ) - { - return Task.FromResult(_jobClient.Requeue(jobId)); - } - - public Task StopJobAsync(string jobId, CancellationToken cancellationToken = default) - { - // Trigger the cancellation token for the job - return Task.FromResult(_jobClient.Delete(jobId)); - } -} diff --git a/src/Machine/src/Serval.Machine.Shared/Services/IBuildJobRunner.cs b/src/Machine/src/Serval.Machine.Shared/Services/IBuildJobRunner.cs index 0c04cbde9..4196ccc06 100644 --- a/src/Machine/src/Serval.Machine.Shared/Services/IBuildJobRunner.cs +++ b/src/Machine/src/Serval.Machine.Shared/Services/IBuildJobRunner.cs @@ -7,7 +7,7 @@ public interface IBuildJobRunner Task CreateEngineAsync(string engineId, string? name = null, CancellationToken cancellationToken = default); Task DeleteEngineAsync(string engineId, CancellationToken cancellationToken = default); - Task CreateJobAsync( + Task<(string JobId, string? JobData)> CreateJobAsync( EngineType engineType, string engineId, string buildId, diff --git a/src/Machine/src/Serval.Machine.Shared/Services/IClearMLBuildJobFactory.cs b/src/Machine/src/Serval.Machine.Shared/Services/IClearMLBuildJobFactory.cs index fe265fc6a..3dbd6e2b7 100644 --- a/src/Machine/src/Serval.Machine.Shared/Services/IClearMLBuildJobFactory.cs +++ b/src/Machine/src/Serval.Machine.Shared/Services/IClearMLBuildJobFactory.cs @@ -9,7 +9,6 @@ Task CreateJobScriptAsync( string buildId, string modelType, BuildStage stage, - object? data = null, string? buildOptions = null, CancellationToken cancellationToken = default ); diff --git a/src/Machine/src/Serval.Machine.Shared/Services/IHangfireBuildJobFactory.cs b/src/Machine/src/Serval.Machine.Shared/Services/IHangfireBuildJobFactory.cs deleted file mode 100644 index e57ac8c54..000000000 --- a/src/Machine/src/Serval.Machine.Shared/Services/IHangfireBuildJobFactory.cs +++ /dev/null @@ -1,8 +0,0 @@ -namespace Serval.Machine.Shared.Services; - -public interface IHangfireBuildJobFactory -{ - EngineType EngineType { get; } - - Job CreateJob(string engineId, string buildId, BuildStage stage, object? data, string? buildOptions); -} diff --git a/src/Machine/src/Serval.Machine.Shared/Services/ILocalBuildJobFactory.cs b/src/Machine/src/Serval.Machine.Shared/Services/ILocalBuildJobFactory.cs new file mode 100644 index 000000000..55cc1d4aa --- /dev/null +++ b/src/Machine/src/Serval.Machine.Shared/Services/ILocalBuildJobFactory.cs @@ -0,0 +1,18 @@ +namespace Serval.Machine.Shared.Services; + +public interface ILocalBuildJobFactory +{ + EngineType EngineType { get; } + + string? Serialize(BuildStage stage, object? data); + + Task RunAsync( + IServiceProvider serviceProvider, + string engineId, + string buildId, + BuildStage stage, + string? jobData, + string? buildOptions, + CancellationToken cancellationToken + ); +} diff --git a/src/Machine/src/Serval.Machine.Shared/Services/IModelFactory.cs b/src/Machine/src/Serval.Machine.Shared/Services/IModelFactory.cs index a2c22058f..fc57ccec9 100644 --- a/src/Machine/src/Serval.Machine.Shared/Services/IModelFactory.cs +++ b/src/Machine/src/Serval.Machine.Shared/Services/IModelFactory.cs @@ -5,5 +5,4 @@ public interface IModelFactory void InitNew(string engineDir); void Cleanup(string engineDir); Task UpdateEngineFromAsync(string engineDir, Stream source, CancellationToken cancellationToken = default); - Task SaveEngineToAsync(string engineDir, Stream destination, CancellationToken cancellationToken = default); } diff --git a/src/Machine/src/Serval.Machine.Shared/Services/LocalBuildJobRunner.cs b/src/Machine/src/Serval.Machine.Shared/Services/LocalBuildJobRunner.cs new file mode 100644 index 000000000..b12ad8ee2 --- /dev/null +++ b/src/Machine/src/Serval.Machine.Shared/Services/LocalBuildJobRunner.cs @@ -0,0 +1,336 @@ +namespace Serval.Machine.Shared.Services; + +public class LocalBuildJobRunner( + IEnumerable factories, + IServiceScopeFactory serviceScopeFactory, + ILogger logger +) : BackgroundService, IBuildJobRunner +{ + private static readonly Dictionary EngineGroups = new() + { + [EngineType.SmtTransfer] = EngineGroup.Translation, + [EngineType.Nmt] = EngineGroup.Translation, + [EngineType.Statistical] = EngineGroup.WordAlignment, + }; + + private static readonly BoundedChannelOptions ChannelOptions = new(128) + { + FullMode = BoundedChannelFullMode.Wait, + SingleReader = true, + SingleWriter = false, + }; + + private readonly Dictionary> _jobChannels = new() + { + [EngineGroup.Translation] = Channel.CreateBounded(ChannelOptions), + [EngineGroup.WordAlignment] = Channel.CreateBounded(ChannelOptions), + }; + private readonly ConcurrentDictionary _pendingJobs = new(); + private readonly ConcurrentDictionary _activeCts = new(); + private readonly Dictionary _factories = factories.ToDictionary(f => + f.EngineType + ); + private readonly IServiceScopeFactory _serviceScopeFactory = serviceScopeFactory; + private readonly ILogger _logger = logger; + + public BuildJobRunnerType Type => BuildJobRunnerType.Local; + + public Task CreateEngineAsync( + string engineId, + string? name = null, + CancellationToken cancellationToken = default + ) => Task.CompletedTask; + + public Task DeleteEngineAsync(string engineId, CancellationToken cancellationToken = default) => Task.CompletedTask; + + public Task<(string JobId, string? JobData)> CreateJobAsync( + EngineType engineType, + string engineId, + string buildId, + BuildStage stage, + object? data = null, + string? buildOptions = null, + CancellationToken cancellationToken = default + ) + { + string jobId = Guid.NewGuid().ToString(); + string? jobData = _factories.TryGetValue(engineType, out ILocalBuildJobFactory? factory) + ? factory.Serialize(stage, data) + : null; + return Task.FromResult((jobId, jobData)); + } + + public Task DeleteJobAsync(string jobId, CancellationToken cancellationToken = default) + { + bool removed = _pendingJobs.TryRemove(jobId, out _); + if (_activeCts.TryRemove(jobId, out CancellationTokenSource? cts)) + { + cts.Cancel(); + cts.Dispose(); + removed = true; + } + return Task.FromResult(removed); + } + + public Task EnqueueJobAsync( + string jobId, + EngineType engineType, + CancellationToken cancellationToken = default + ) => Task.FromResult(true); + + public Task StopJobAsync(string jobId, CancellationToken cancellationToken = default) + { + _pendingJobs.TryRemove(jobId, out _); + if (_activeCts.TryRemove(jobId, out CancellationTokenSource? cts)) + { + cts.Cancel(); + cts.Dispose(); + return Task.FromResult(true); + } + return Task.FromResult(false); + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + // Scope lives for the duration of ExecuteAsync to keep subscriptions alive. + using IServiceScope scope = _serviceScopeFactory.CreateScope(); + var translationEngines = scope.ServiceProvider.GetRequiredService>(); + var wordAlignmentEngines = scope.ServiceProvider.GetRequiredService>(); + + // Subscriptions are created before recovery so no changes are missed during the recovery window. + using ISubscription translationSub = await translationEngines.SubscribeAsync( + e => + e.CurrentBuild != null + && e.CurrentBuild.BuildJobRunner == BuildJobRunnerType.Local + && e.CurrentBuild.JobState == BuildJobState.Pending, + stoppingToken + ); + using ISubscription wordAlignmentSub = await wordAlignmentEngines.SubscribeAsync( + e => + e.CurrentBuild != null + && e.CurrentBuild.BuildJobRunner == BuildJobRunnerType.Local + && e.CurrentBuild.JobState == BuildJobState.Pending, + stoppingToken + ); + + await RecoverPendingJobsAsync(scope.ServiceProvider, stoppingToken); + + await Task.WhenAll( + WatchEngineGroupAsync(translationSub, EngineGroup.Translation, stoppingToken), + WatchEngineGroupAsync(wordAlignmentSub, EngineGroup.WordAlignment, stoppingToken), + ProcessJobsAsync(EngineGroup.Translation, stoppingToken), + ProcessJobsAsync(EngineGroup.WordAlignment, stoppingToken) + ); + } + + private async Task RecoverPendingJobsAsync(IServiceProvider sp, CancellationToken cancellationToken) + { + var translationBuildJobService = sp.GetRequiredService>(); + var wordAlignmentBuildJobService = sp.GetRequiredService>(); + var dataAccessContext = sp.GetRequiredService(); + var translationPlatform = sp.GetRequiredKeyedService(EngineGroup.Translation); + var wordAlignmentPlatform = sp.GetRequiredKeyedService(EngineGroup.WordAlignment); + + await RecoverEngineGroupAsync( + translationBuildJobService, + translationPlatform, + dataAccessContext, + cancellationToken + ); + await RecoverEngineGroupAsync( + wordAlignmentBuildJobService, + wordAlignmentPlatform, + dataAccessContext, + cancellationToken + ); + } + + private async Task RecoverEngineGroupAsync( + IBuildJobService buildJobService, + IPlatformService platformService, + IDataAccessContext dataAccessContext, + CancellationToken cancellationToken + ) + where TEngine : ITrainingEngine + { + IReadOnlyList engines = await buildJobService.GetBuildingEnginesAsync( + BuildJobRunnerType.Local, + cancellationToken + ); + + foreach (TEngine engine in engines.Where(e => e.CurrentBuild!.JobState == BuildJobState.Active)) + { + await ResetActiveJobAsync( + buildJobService, + platformService, + dataAccessContext, + engine.EngineId, + engine.CurrentBuild!.BuildId, + cancellationToken + ); + } + + // Re-query after Active→Pending resets to get the refreshed list + IReadOnlyList pending = await buildJobService.GetBuildingEnginesAsync( + BuildJobRunnerType.Local, + cancellationToken + ); + + foreach ( + TEngine engine in pending + .Where(e => e.CurrentBuild!.JobState == BuildJobState.Pending) + .OrderBy(e => e.CurrentBuild!.QueuedAt) + ) + { + EnqueueRecoveredJob(engine.EngineId, engine.CurrentBuild!, engine.Type); + } + } + + private static async Task ResetActiveJobAsync( + IBuildJobService buildJobService, + IPlatformService platformService, + IDataAccessContext dataAccessContext, + string engineId, + string buildId, + CancellationToken cancellationToken + ) + { + await dataAccessContext.WithTransactionAsync( + async ct => + { + await platformService.BuildRestartingAsync(buildId, CancellationToken.None); + await buildJobService.BuildJobRestartingAsync(engineId, buildId, CancellationToken.None); + }, + cancellationToken: cancellationToken + ); + } + + private void EnqueueRecoveredJob(string engineId, Build build, EngineType engineType) + { + if ( + _pendingJobs.TryAdd( + build.JobId, + new JobInfo(engineId, build.BuildId, engineType, build.Stage, build.JobData, build.Options) + ) + ) + { + _jobChannels[EngineGroups[engineType]].Writer.TryWrite(build.JobId); + } + } + + private async Task WatchEngineGroupAsync( + ISubscription subscription, + EngineGroup engineGroup, + CancellationToken cancellationToken + ) + where TEngine : ITrainingEngine + { + while (!cancellationToken.IsCancellationRequested) + { + EntityChange change = subscription.Change; + if (change.Type is EntityChangeType.Insert or EntityChangeType.Update && change.Entity != null) + { + TEngine engine = change.Entity; + Build? build = engine.CurrentBuild; + if ( + build?.BuildJobRunner == BuildJobRunnerType.Local + && build.JobState == BuildJobState.Pending + && !_activeCts.ContainsKey(build.JobId) + && _pendingJobs.TryAdd( + build.JobId, + new JobInfo( + engine.EngineId, + build.BuildId, + engine.Type, + build.Stage, + build.JobData, + build.Options + ) + ) + ) + { + _jobChannels[engineGroup].Writer.TryWrite(build.JobId); + } + } + + try + { + await subscription.WaitForChangeAsync( + changeTypes: new HashSet { EntityChangeType.Insert, EntityChangeType.Update }, + cancellationToken: cancellationToken + ); + } + catch (OperationCanceledException) + { + break; + } + } + } + + private async Task ProcessJobsAsync(EngineGroup engineGroup, CancellationToken stoppingToken) + { + Channel channel = _jobChannels[engineGroup]; + while (!stoppingToken.IsCancellationRequested) + { + string jobId; + try + { + jobId = await channel.Reader.ReadAsync(stoppingToken); + } + catch (OperationCanceledException) + { + break; + } + + if (!_pendingJobs.TryRemove(jobId, out JobInfo? info)) + continue; + + var cts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken); + _activeCts[jobId] = cts; + try + { + await ExecuteJobAsync(jobId, info, cts); + } + catch (OperationCanceledException) when (!stoppingToken.IsCancellationRequested) + { + // job was explicitly canceled via StopJobAsync; continue processing the queue + } + } + } + + private async Task ExecuteJobAsync(string jobId, JobInfo info, CancellationTokenSource cts) + { + try + { + using IServiceScope scope = _serviceScopeFactory.CreateScope(); + ILocalBuildJobFactory factory = _factories[info.EngineType]; + await factory.RunAsync( + scope.ServiceProvider, + info.EngineId, + info.BuildId, + info.Stage, + info.JobData, + info.BuildOptions, + cts.Token + ); + } + catch (Exception ex) when (ex is not OperationCanceledException) + { + _logger.LogError(ex, "Unhandled exception in local build job {JobId}", jobId); + } + finally + { + _activeCts.TryRemove(jobId, out _); + cts.Dispose(); + } + } + + private record JobInfo( + string EngineId, + string BuildId, + EngineType EngineType, + BuildStage Stage, + string? JobData, + string? BuildOptions + ); +} diff --git a/src/Machine/src/Serval.Machine.Shared/Services/ModelFactoryBase.cs b/src/Machine/src/Serval.Machine.Shared/Services/ModelFactoryBase.cs index b9b5e8b7b..12db93c3a 100644 --- a/src/Machine/src/Serval.Machine.Shared/Services/ModelFactoryBase.cs +++ b/src/Machine/src/Serval.Machine.Shared/Services/ModelFactoryBase.cs @@ -2,19 +2,13 @@ public abstract class ModelFactoryBase : IModelFactory { - public virtual ITrainer CreateTrainer( + public abstract ITrainer CreateTrainer( string engineDir, IRangeTokenizer tokenizer, IParallelTextCorpus corpus - ) - { - throw new NotImplementedException(); - } + ); - public virtual void InitNew(string engineDir) - { - throw new NotImplementedException(); - } + public abstract void InitNew(string engineDir); public abstract void Cleanup(string engineDir); @@ -40,24 +34,4 @@ await TarFile.ExtractToDirectoryAsync( cancellationToken: cancellationToken ); } - - public async Task SaveEngineToAsync( - string engineDir, - Stream destination, - CancellationToken cancellationToken = default - ) - { - // create zip archive in memory stream - // This cannot be created directly to the shared stream because it all needs to be written at once - await using MemoryStream memoryStream = new(); - await TarFile.CreateFromDirectoryAsync( - engineDir, - memoryStream, - includeBaseDirectory: false, - cancellationToken: cancellationToken - ); - memoryStream.Seek(0, SeekOrigin.Begin); - await using GZipStream gzipStream = new(destination, CompressionMode.Compress); - await memoryStream.CopyToAsync(gzipStream, cancellationToken); - } } diff --git a/src/Machine/src/Serval.Machine.Shared/Services/NmtClearMLBuildJobFactory.cs b/src/Machine/src/Serval.Machine.Shared/Services/NmtClearMLBuildJobFactory.cs index ee2b07bc7..4ab83dc42 100644 --- a/src/Machine/src/Serval.Machine.Shared/Services/NmtClearMLBuildJobFactory.cs +++ b/src/Machine/src/Serval.Machine.Shared/Services/NmtClearMLBuildJobFactory.cs @@ -17,7 +17,6 @@ public async Task CreateJobScriptAsync( string buildId, string modelType, BuildStage stage, - object? data = null, string? buildOptions = null, CancellationToken cancellationToken = default ) diff --git a/src/Machine/src/Serval.Machine.Shared/Services/NmtEngineService.cs b/src/Machine/src/Serval.Machine.Shared/Services/NmtEngineService.cs index 8108e1a98..35ed66768 100644 --- a/src/Machine/src/Serval.Machine.Shared/Services/NmtEngineService.cs +++ b/src/Machine/src/Serval.Machine.Shared/Services/NmtEngineService.cs @@ -91,7 +91,7 @@ public async Task StartBuildAsync( ) { bool building = !await _buildJobService.StartBuildJobAsync( - BuildJobRunnerType.Hangfire, + BuildJobRunnerType.Local, EngineType.Nmt, engineId, buildId, diff --git a/src/Machine/src/Serval.Machine.Shared/Services/NmtHangfireBuildJobFactory.cs b/src/Machine/src/Serval.Machine.Shared/Services/NmtHangfireBuildJobFactory.cs deleted file mode 100644 index c9a4a5b07..000000000 --- a/src/Machine/src/Serval.Machine.Shared/Services/NmtHangfireBuildJobFactory.cs +++ /dev/null @@ -1,28 +0,0 @@ -using static Serval.Machine.Shared.Services.HangfireBuildJobRunner; - -namespace Serval.Machine.Shared.Services; - -public class NmtHangfireBuildJobFactory : IHangfireBuildJobFactory -{ - public EngineType EngineType => EngineType.Nmt; - - public Job CreateJob(string engineId, string buildId, BuildStage stage, object? data, string? buildOptions) - { - return stage switch - { - BuildStage.Preprocess => CreateJob< - TranslationEngine, - NmtPreprocessBuildJob, - IReadOnlyList - >(engineId, buildId, BuildJobQueues.Nmt, data, buildOptions), - BuildStage.Postprocess => CreateJob( - engineId, - buildId, - BuildJobQueues.Nmt, - data, - buildOptions - ), - _ => throw new ArgumentException("Unknown build stage.", nameof(stage)), - }; - } -} diff --git a/src/Machine/src/Serval.Machine.Shared/Services/NmtLocalBuildJobFactory.cs b/src/Machine/src/Serval.Machine.Shared/Services/NmtLocalBuildJobFactory.cs new file mode 100644 index 000000000..5b0be2793 --- /dev/null +++ b/src/Machine/src/Serval.Machine.Shared/Services/NmtLocalBuildJobFactory.cs @@ -0,0 +1,57 @@ +namespace Serval.Machine.Shared.Services; + +public class NmtLocalBuildJobFactory : ILocalBuildJobFactory +{ + private static readonly JsonSerializerOptions SerializerOptions = new() + { + PropertyNamingPolicy = JsonNamingPolicy.CamelCase, + }; + + public EngineType EngineType => EngineType.Nmt; + + public string? Serialize(BuildStage stage, object? data) => + stage switch + { + BuildStage.Preprocess => JsonSerializer.Serialize( + (IReadOnlyList)data!, + SerializerOptions + ), + BuildStage.Postprocess => data is (int tc, double conf) + ? JsonSerializer.Serialize(new PostprocessData(tc, conf), SerializerOptions) + : null, + _ => null, + }; + + public async Task RunAsync( + IServiceProvider serviceProvider, + string engineId, + string buildId, + BuildStage stage, + string? jobData, + string? buildOptions, + CancellationToken cancellationToken + ) + { + switch (stage) + { + case BuildStage.Preprocess: + var preprocessJob = ActivatorUtilities.CreateInstance(serviceProvider); + var corpora = JsonSerializer.Deserialize>(jobData!, SerializerOptions)!; + await preprocessJob.RunAsync(engineId, buildId, corpora, buildOptions, cancellationToken); + break; + case BuildStage.Postprocess: + var postprocessJob = ActivatorUtilities.CreateInstance(serviceProvider); + var postData = JsonSerializer.Deserialize(jobData!, SerializerOptions)!; + await postprocessJob.RunAsync( + engineId, + buildId, + (postData.TrainCount, postData.Confidence), + buildOptions, + cancellationToken + ); + break; + default: + throw new ArgumentException($"NMT does not support local stage: {stage}", nameof(stage)); + } + } +} diff --git a/src/Machine/src/Serval.Machine.Shared/Services/PostprocessBuildJob.cs b/src/Machine/src/Serval.Machine.Shared/Services/PostprocessBuildJob.cs index d263d82a3..e25541b1d 100644 --- a/src/Machine/src/Serval.Machine.Shared/Services/PostprocessBuildJob.cs +++ b/src/Machine/src/Serval.Machine.Shared/Services/PostprocessBuildJob.cs @@ -8,7 +8,7 @@ public abstract class PostprocessBuildJob( ILogger> logger, ISharedFileService sharedFileService, IOptionsMonitor options -) : HangfireBuildJob(platformService, engines, dataAccessContext, buildJobService, logger) +) : BuildJob(platformService, engines, dataAccessContext, buildJobService, logger) where TEngine : ITrainingEngine { protected ISharedFileService SharedFileService { get; } = sharedFileService; @@ -19,12 +19,7 @@ protected virtual Task SaveModelAsync(string engineId, string buildId) return Task.FromResult(0); } - protected override async Task CleanupAsync( - string engineId, - string buildId, - (int, double) data, - JobCompletionStatus completionStatus - ) + protected override async Task CleanupAsync(string engineId, string buildId, JobCompletionStatus completionStatus) { if (completionStatus is JobCompletionStatus.Restarting) return; diff --git a/src/Machine/src/Serval.Machine.Shared/Services/PostprocessData.cs b/src/Machine/src/Serval.Machine.Shared/Services/PostprocessData.cs new file mode 100644 index 000000000..8853d4bdf --- /dev/null +++ b/src/Machine/src/Serval.Machine.Shared/Services/PostprocessData.cs @@ -0,0 +1,3 @@ +namespace Serval.Machine.Shared.Services; + +public record PostprocessData(int TrainCount, double Confidence); diff --git a/src/Machine/src/Serval.Machine.Shared/Services/PreprocessBuildJob.cs b/src/Machine/src/Serval.Machine.Shared/Services/PreprocessBuildJob.cs index fce1c1e7d..dfad1cdfb 100644 --- a/src/Machine/src/Serval.Machine.Shared/Services/PreprocessBuildJob.cs +++ b/src/Machine/src/Serval.Machine.Shared/Services/PreprocessBuildJob.cs @@ -10,7 +10,7 @@ public abstract class PreprocessBuildJob( IParallelCorpusService parallelCorpusService, IOptionsMonitor options ) - : HangfireBuildJob>( + : BuildJob>( platformService, engines, dataAccessContext, @@ -28,7 +28,7 @@ IOptionsMonitor options Encoder = JavaScriptEncoder.Create(UnicodeRanges.All), }; - internal BuildJobRunnerType TrainJobRunnerType { get; init; } = BuildJobRunnerType.ClearML; + internal BuildJobRunnerType TrainJobRunnerType { get; set; } = BuildJobRunnerType.ClearML; protected readonly BuildJobOptions BuildJobOptions = options.CurrentValue; protected readonly ISharedFileService SharedFileService = sharedFileService; protected readonly IParallelCorpusService ParallelCorpusService = parallelCorpusService; @@ -112,12 +112,7 @@ CancellationToken cancellationToken CancellationToken cancellationToken ); - protected override async Task CleanupAsync( - string engineId, - string buildId, - IReadOnlyList data, - JobCompletionStatus completionStatus - ) + protected override async Task CleanupAsync(string engineId, string buildId, JobCompletionStatus completionStatus) { if (completionStatus is JobCompletionStatus.Canceled) { diff --git a/src/Machine/src/Serval.Machine.Shared/Services/SmtTransferClearMLBuildJobFactory.cs b/src/Machine/src/Serval.Machine.Shared/Services/SmtTransferClearMLBuildJobFactory.cs index fe97eaeb5..0552c093d 100644 --- a/src/Machine/src/Serval.Machine.Shared/Services/SmtTransferClearMLBuildJobFactory.cs +++ b/src/Machine/src/Serval.Machine.Shared/Services/SmtTransferClearMLBuildJobFactory.cs @@ -15,7 +15,6 @@ public async Task CreateJobScriptAsync( string buildId, string modelType, BuildStage stage, - object? data = null, string? buildOptions = null, CancellationToken cancellationToken = default ) diff --git a/src/Machine/src/Serval.Machine.Shared/Services/SmtTransferEngineService.cs b/src/Machine/src/Serval.Machine.Shared/Services/SmtTransferEngineService.cs index e142984f4..04fa9ae5a 100644 --- a/src/Machine/src/Serval.Machine.Shared/Services/SmtTransferEngineService.cs +++ b/src/Machine/src/Serval.Machine.Shared/Services/SmtTransferEngineService.cs @@ -194,7 +194,7 @@ public async Task StartBuildAsync( ) { bool building = !await _buildJobService.StartBuildJobAsync( - BuildJobRunnerType.Hangfire, + BuildJobRunnerType.Local, EngineType.SmtTransfer, engineId, buildId, diff --git a/src/Machine/src/Serval.Machine.Shared/Services/SmtTransferHangfireBuildJobFactory.cs b/src/Machine/src/Serval.Machine.Shared/Services/SmtTransferHangfireBuildJobFactory.cs deleted file mode 100644 index eb2880c90..000000000 --- a/src/Machine/src/Serval.Machine.Shared/Services/SmtTransferHangfireBuildJobFactory.cs +++ /dev/null @@ -1,34 +0,0 @@ -using static Serval.Machine.Shared.Services.HangfireBuildJobRunner; - -namespace Serval.Machine.Shared.Services; - -public class SmtTransferHangfireBuildJobFactory : IHangfireBuildJobFactory -{ - public EngineType EngineType => EngineType.SmtTransfer; - - public Job CreateJob(string engineId, string buildId, BuildStage stage, object? data, string? buildOptions) - { - return stage switch - { - BuildStage.Preprocess => CreateJob< - TranslationEngine, - SmtTransferPreprocessBuildJob, - IReadOnlyList - >(engineId, buildId, BuildJobQueues.SmtTransfer, data, buildOptions), - BuildStage.Postprocess => CreateJob( - engineId, - buildId, - BuildJobQueues.SmtTransfer, - data, - buildOptions - ), - BuildStage.Train => CreateJob( - engineId, - buildId, - BuildJobQueues.SmtTransfer, - buildOptions - ), - _ => throw new ArgumentException("Unknown build stage.", nameof(stage)), - }; - } -} diff --git a/src/Machine/src/Serval.Machine.Shared/Services/SmtTransferLocalBuildJobFactory.cs b/src/Machine/src/Serval.Machine.Shared/Services/SmtTransferLocalBuildJobFactory.cs new file mode 100644 index 000000000..69bd722cc --- /dev/null +++ b/src/Machine/src/Serval.Machine.Shared/Services/SmtTransferLocalBuildJobFactory.cs @@ -0,0 +1,58 @@ +namespace Serval.Machine.Shared.Services; + +public class SmtTransferLocalBuildJobFactory : ILocalBuildJobFactory +{ + private static readonly JsonSerializerOptions SerializerOptions = new() + { + PropertyNamingPolicy = JsonNamingPolicy.CamelCase, + }; + + public EngineType EngineType => EngineType.SmtTransfer; + + public string? Serialize(BuildStage stage, object? data) => + stage switch + { + BuildStage.Preprocess => JsonSerializer.Serialize( + (IReadOnlyList)data!, + SerializerOptions + ), + BuildStage.Train => null, + BuildStage.Postprocess => data is (int tc, double conf) + ? JsonSerializer.Serialize(new PostprocessData(tc, conf), SerializerOptions) + : null, + _ => null, + }; + + public async Task RunAsync( + IServiceProvider serviceProvider, + string engineId, + string buildId, + BuildStage stage, + string? jobData, + string? buildOptions, + CancellationToken cancellationToken + ) + { + switch (stage) + { + case BuildStage.Preprocess: + var preprocessJob = ActivatorUtilities.CreateInstance(serviceProvider); + var corpora = JsonSerializer.Deserialize>(jobData!, SerializerOptions)!; + await preprocessJob.RunAsync(engineId, buildId, corpora, buildOptions, cancellationToken); + break; + case BuildStage.Postprocess: + var postprocessJob = ActivatorUtilities.CreateInstance(serviceProvider); + var postData = JsonSerializer.Deserialize(jobData!, SerializerOptions)!; + await postprocessJob.RunAsync( + engineId, + buildId, + (postData.TrainCount, postData.Confidence), + buildOptions, + cancellationToken + ); + break; + default: + throw new ArgumentException($"Unsupported stage: {stage}", nameof(stage)); + } + } +} diff --git a/src/Machine/src/Serval.Machine.Shared/Services/SmtTransferTrainBuildJob.cs b/src/Machine/src/Serval.Machine.Shared/Services/SmtTransferTrainBuildJob.cs deleted file mode 100644 index 2047e1b84..000000000 --- a/src/Machine/src/Serval.Machine.Shared/Services/SmtTransferTrainBuildJob.cs +++ /dev/null @@ -1,249 +0,0 @@ -namespace Serval.Machine.Shared.Services; - -public class SmtTransferTrainBuildJob( - [FromKeyedServices(EngineGroup.Translation)] IPlatformService platformService, - IRepository engines, - IDataAccessContext dataAccessContext, - IBuildJobService buildJobService, - ILogger logger, - ISharedFileService sharedFileService, - ITruecaserFactory truecaserFactory, - ISmtModelFactory smtModelFactory, - ITransferEngineFactory transferEngineFactory -) : HangfireBuildJob(platformService, engines, dataAccessContext, buildJobService, logger) -{ - // Using JavaScriptEncoder.Create(UnicodeRanges.All) to avoid escaping surrogate pairs - // (including those outside of the BMP) which can result in invalid UTF-8. - // This is safe since the data written by this writer is only read internally and only as UTF-8 encoded JSON. - protected static readonly JsonWriterOptions PretranslateWriterOptions = new() - { - Indented = true, - Encoder = JavaScriptEncoder.Create(UnicodeRanges.All), - }; - private static readonly JsonSerializerOptions JsonSerializerOptions = new() - { - PropertyNamingPolicy = JsonNamingPolicy.CamelCase, - }; - private const int BatchSize = 128; - - private readonly ISharedFileService _sharedFileService = sharedFileService; - private readonly ITruecaserFactory _truecaserFactory = truecaserFactory; - private readonly ISmtModelFactory _smtModelFactory = smtModelFactory; - private readonly ITransferEngineFactory _transferEngineFactory = transferEngineFactory; - - protected override async Task DoWorkAsync( - string engineId, - string buildId, - object? data, - string? buildOptions, - CancellationToken cancellationToken - ) - { - using TempDirectory tempDir = new(buildId); - string corpusDir = Path.Combine(tempDir.Path, "corpus"); - await DownloadDataAsync(buildId, corpusDir, cancellationToken); - - // assemble corpus - ITextCorpus sourceCorpus = new TextFileTextCorpus( - Path.Combine(corpusDir, "train.src.txt"), - Path.Combine(corpusDir, "train.key-terms.src.txt") - ); - ITextCorpus targetCorpus = new TextFileTextCorpus( - Path.Combine(corpusDir, "train.trg.txt"), - Path.Combine(corpusDir, "train.key-terms.trg.txt") - ); - IParallelTextCorpus parallelCorpus = sourceCorpus.AlignRows(targetCorpus); - - // train SMT model - string engineDir = Path.Combine(tempDir.Path, "engine"); - (int trainCorpusSize, double confidence) = await TrainAsync( - buildId, - engineDir, - targetCorpus, - parallelCorpus, - cancellationToken - ); - - cancellationToken.ThrowIfCancellationRequested(); - - await GeneratePretranslationsAsync(buildId, engineDir, cancellationToken); - - bool canceling = !await BuildJobService.StartBuildJobAsync( - BuildJobRunnerType.Hangfire, - EngineType.SmtTransfer, - engineId, - buildId, - BuildStage.Postprocess, - data: (trainCorpusSize, confidence), - buildOptions: buildOptions, - cancellationToken: cancellationToken - ); - if (canceling) - throw new OperationCanceledException(); - } - - protected override async Task CleanupAsync( - string engineId, - string buildId, - object? data, - JobCompletionStatus completionStatus - ) - { - if (completionStatus is JobCompletionStatus.Canceled) - { - try - { - await _sharedFileService.DeleteAsync($"builds/{buildId}/"); - } - catch (Exception e) - { - Logger.LogWarning(e, "Unable to to delete job data for build {BuildId}.", buildId); - } - } - } - - private async Task DownloadDataAsync(string buildId, string corpusDir, CancellationToken cancellationToken) - { - Directory.CreateDirectory(corpusDir); - await using Stream srcText = await _sharedFileService.OpenReadAsync( - $"builds/{buildId}/train.src.txt", - cancellationToken - ); - await using FileStream srcFileStream = File.Create(Path.Combine(corpusDir, "train.src.txt")); - await srcText.CopyToAsync(srcFileStream, cancellationToken); - - await using Stream tgtText = await _sharedFileService.OpenReadAsync( - $"builds/{buildId}/train.trg.txt", - cancellationToken - ); - await using FileStream tgtFileStream = File.Create(Path.Combine(corpusDir, "train.trg.txt")); - await tgtText.CopyToAsync(tgtFileStream, cancellationToken); - - await using Stream srcKeyTermsText = await _sharedFileService.OpenReadAsync( - $"builds/{buildId}/train.key-terms.src.txt", - cancellationToken - ); - await using FileStream srcKeyTermsFileStream = File.Create(Path.Combine(corpusDir, "train.key-terms.src.txt")); - await srcKeyTermsText.CopyToAsync(srcKeyTermsFileStream, cancellationToken); - - await using Stream tgtKeyTermsText = await _sharedFileService.OpenReadAsync( - $"builds/{buildId}/train.key-terms.trg.txt", - cancellationToken - ); - await using FileStream tgtKeyTermsFileStream = File.Create(Path.Combine(corpusDir, "train.key-terms.trg.txt")); - await tgtKeyTermsFileStream.CopyToAsync(tgtKeyTermsText, cancellationToken); - } - - private async Task<(int TrainCorpusSize, double Confidence)> TrainAsync( - string buildId, - string engineDir, - ITextCorpus targetCorpus, - IParallelTextCorpus parallelCorpus, - CancellationToken cancellationToken - ) - { - _smtModelFactory.InitNew(engineDir); - LatinWordTokenizer tokenizer = new(); - int trainCorpusSize; - double confidence; - using ITrainer smtModelTrainer = _smtModelFactory.CreateTrainer(engineDir, tokenizer, parallelCorpus); - using ITrainer truecaseTrainer = _truecaserFactory.CreateTrainer(engineDir, tokenizer, targetCorpus); - cancellationToken.ThrowIfCancellationRequested(); - - var progress = new BuildProgress(PlatformService, buildId); - await smtModelTrainer.TrainAsync(progress, cancellationToken); - await truecaseTrainer.TrainAsync(cancellationToken: cancellationToken); - - trainCorpusSize = smtModelTrainer.Stats.TrainCorpusSize; - confidence = smtModelTrainer.Stats.Metrics["bleu"] * 100.0; - - cancellationToken.ThrowIfCancellationRequested(); - - await smtModelTrainer.SaveAsync(cancellationToken); - await truecaseTrainer.SaveAsync(cancellationToken); - - await using Stream engineStream = await _sharedFileService.OpenWriteAsync( - $"builds/{buildId}/model.tar.gz", - cancellationToken - ); - await _smtModelFactory.SaveEngineToAsync(engineDir, engineStream, cancellationToken); - return (trainCorpusSize, confidence); - } - - private async Task GeneratePretranslationsAsync( - string buildId, - string engineDir, - CancellationToken cancellationToken - ) - { - await using Stream sourceStream = await _sharedFileService.OpenReadAsync( - $"builds/{buildId}/pretranslate.src.json", - cancellationToken - ); - - IAsyncEnumerable pretranslations = JsonSerializer - .DeserializeAsyncEnumerable(sourceStream, JsonSerializerOptions, cancellationToken) - .OfType(); - - await using Stream targetStream = await _sharedFileService.OpenWriteAsync( - $"builds/{buildId}/pretranslate.trg.json", - cancellationToken - ); - await using Utf8JsonWriter targetWriter = new(targetStream, PretranslateWriterOptions); - - LatinWordTokenizer tokenizer = new(); - LatinWordDetokenizer detokenizer = new(); - ITruecaser truecaser = _truecaserFactory.Create(engineDir); - using IInteractiveTranslationModel smtModel = _smtModelFactory.Create( - engineDir, - tokenizer, - detokenizer, - truecaser - ); - using ITranslationEngine? transferEngine = _transferEngineFactory.Create( - engineDir, - tokenizer, - detokenizer, - truecaser - ); - HybridTranslationEngine hybridEngine = new(smtModel, transferEngine) { TargetDetokenizer = detokenizer }; - - await foreach (IReadOnlyList batch in BatchAsync(pretranslations)) - { - string[] segments = batch.Select(p => p.Translation).ToArray(); - IReadOnlyList results = await hybridEngine.TranslateBatchAsync( - segments, - cancellationToken - ); - foreach ((Pretranslation pretranslation, TranslationResult result) in batch.Zip(results)) - { - JsonSerializer.Serialize( - targetWriter, - pretranslation with - { - Translation = result.Translation, - }, - JsonSerializerOptions - ); - } - } - } - - public static async IAsyncEnumerable> BatchAsync( - IAsyncEnumerable pretranslations - ) - { - List batch = []; - await foreach (Pretranslation item in pretranslations) - { - batch.Add(item); - if (batch.Count == BatchSize) - { - yield return batch; - batch = []; - } - } - if (batch.Count > 0) - yield return batch; - } -} diff --git a/src/Machine/src/Serval.Machine.Shared/Services/StatisticalEngineService.cs b/src/Machine/src/Serval.Machine.Shared/Services/StatisticalEngineService.cs index 7d04f5262..b44ccdbef 100644 --- a/src/Machine/src/Serval.Machine.Shared/Services/StatisticalEngineService.cs +++ b/src/Machine/src/Serval.Machine.Shared/Services/StatisticalEngineService.cs @@ -119,7 +119,7 @@ public async Task StartBuildAsync( ) { bool building = !await _buildJobService.StartBuildJobAsync( - BuildJobRunnerType.Hangfire, + BuildJobRunnerType.Local, EngineType.Statistical, engineId, buildId, diff --git a/src/Machine/src/Serval.Machine.Shared/Services/StatisticalHangfireBuildJobFactory.cs b/src/Machine/src/Serval.Machine.Shared/Services/StatisticalHangfireBuildJobFactory.cs deleted file mode 100644 index 712b79552..000000000 --- a/src/Machine/src/Serval.Machine.Shared/Services/StatisticalHangfireBuildJobFactory.cs +++ /dev/null @@ -1,34 +0,0 @@ -using static Serval.Machine.Shared.Services.HangfireBuildJobRunner; - -namespace Serval.Machine.Shared.Services; - -public class StatisticalHangfireBuildJobFactory : IHangfireBuildJobFactory -{ - public EngineType EngineType => EngineType.Statistical; - - public Job CreateJob(string engineId, string buildId, BuildStage stage, object? data, string? buildOptions) - { - return stage switch - { - BuildStage.Preprocess => CreateJob< - WordAlignmentEngine, - WordAlignmentPreprocessBuildJob, - IReadOnlyList - >(engineId, buildId, BuildJobQueues.Statistical, data, buildOptions), - BuildStage.Postprocess => CreateJob( - engineId, - buildId, - BuildJobQueues.Statistical, - data, - buildOptions - ), - BuildStage.Train => CreateJob( - engineId, - buildId, - BuildJobQueues.Statistical, - buildOptions - ), - _ => throw new ArgumentException("Unknown build stage.", nameof(stage)), - }; - } -} diff --git a/src/Machine/src/Serval.Machine.Shared/Services/StatisticalLocalBuildJobFactory.cs b/src/Machine/src/Serval.Machine.Shared/Services/StatisticalLocalBuildJobFactory.cs new file mode 100644 index 000000000..c9925cad8 --- /dev/null +++ b/src/Machine/src/Serval.Machine.Shared/Services/StatisticalLocalBuildJobFactory.cs @@ -0,0 +1,58 @@ +namespace Serval.Machine.Shared.Services; + +public class StatisticalLocalBuildJobFactory : ILocalBuildJobFactory +{ + private static readonly JsonSerializerOptions SerializerOptions = new() + { + PropertyNamingPolicy = JsonNamingPolicy.CamelCase, + }; + + public EngineType EngineType => EngineType.Statistical; + + public string? Serialize(BuildStage stage, object? data) => + stage switch + { + BuildStage.Preprocess => JsonSerializer.Serialize( + (IReadOnlyList)data!, + SerializerOptions + ), + BuildStage.Train => null, + BuildStage.Postprocess => data is (int tc, double conf) + ? JsonSerializer.Serialize(new PostprocessData(tc, conf), SerializerOptions) + : null, + _ => null, + }; + + public async Task RunAsync( + IServiceProvider serviceProvider, + string engineId, + string buildId, + BuildStage stage, + string? jobData, + string? buildOptions, + CancellationToken cancellationToken + ) + { + switch (stage) + { + case BuildStage.Preprocess: + var preprocessJob = ActivatorUtilities.CreateInstance(serviceProvider); + var corpora = JsonSerializer.Deserialize>(jobData!, SerializerOptions)!; + await preprocessJob.RunAsync(engineId, buildId, corpora, buildOptions, cancellationToken); + break; + case BuildStage.Postprocess: + var postprocessJob = ActivatorUtilities.CreateInstance(serviceProvider); + var postData = JsonSerializer.Deserialize(jobData!, SerializerOptions)!; + await postprocessJob.RunAsync( + engineId, + buildId, + (postData.TrainCount, postData.Confidence), + buildOptions, + cancellationToken + ); + break; + default: + throw new ArgumentException($"Unsupported stage: {stage}", nameof(stage)); + } + } +} diff --git a/src/Machine/src/Serval.Machine.Shared/Services/StatisticalTrainBuildJob.cs b/src/Machine/src/Serval.Machine.Shared/Services/StatisticalTrainBuildJob.cs deleted file mode 100644 index 6a5c71bf7..000000000 --- a/src/Machine/src/Serval.Machine.Shared/Services/StatisticalTrainBuildJob.cs +++ /dev/null @@ -1,234 +0,0 @@ -namespace Serval.Machine.Shared.Services; - -public class StatisticalTrainBuildJob( - [FromKeyedServices(EngineGroup.WordAlignment)] IPlatformService platformService, - IRepository engines, - IDataAccessContext dataAccessContext, - IBuildJobService buildJobService, - ILogger logger, - ISharedFileService sharedFileService, - IWordAlignmentModelFactory wordAlignmentModelFactory -) : HangfireBuildJob(platformService, engines, dataAccessContext, buildJobService, logger) -{ - // Using JavaScriptEncoder.Create(UnicodeRanges.All) to avoid escaping surrogate pairs - // (including those outside of the BMP) which can result in invalid UTF-8. - // This is safe since the data written by this writer is only read internally and only as UTF-8 encoded JSON. - protected static readonly JsonWriterOptions WordAlignmentWriterOptions = new() - { - Indented = true, - Encoder = JavaScriptEncoder.Create(UnicodeRanges.All), - }; - private static readonly JsonSerializerOptions JsonSerializerOptions = new() - { - PropertyNamingPolicy = JsonNamingPolicy.CamelCase, - }; - private const int BatchSize = 128; - - private readonly ISharedFileService _sharedFileService = sharedFileService; - private readonly IWordAlignmentModelFactory _wordAlignmentFactory = wordAlignmentModelFactory; - - protected override async Task DoWorkAsync( - string engineId, - string buildId, - object? data, - string? buildOptions, - CancellationToken cancellationToken - ) - { - string? modelType = null; - if (buildOptions is not null) - modelType = (string?)JsonSerializer.Deserialize(buildOptions)?["thot_align"]?["model_type"]; - - using TempDirectory tempDir = new(buildId); - string corpusDir = Path.Combine(tempDir.Path, "corpus"); - await DownloadDataAsync(buildId, corpusDir, cancellationToken); - - // assemble corpus - ITextCorpus sourceCorpus = new TextFileTextCorpus( - Path.Combine(corpusDir, "train.src.txt"), - Path.Combine(corpusDir, "train.key-terms.src.txt") - ); - ITextCorpus targetCorpus = new TextFileTextCorpus( - Path.Combine(corpusDir, "train.trg.txt"), - Path.Combine(corpusDir, "train.key-terms.trg.txt") - ); - IParallelTextCorpus parallelCorpus = sourceCorpus.AlignRows(targetCorpus); - - // train word alignment model - string engineDir = Path.Combine(tempDir.Path, "engine"); - int trainCount = await TrainAsync(buildId, engineDir, parallelCorpus, modelType, cancellationToken); - - cancellationToken.ThrowIfCancellationRequested(); - - await GenerateWordAlignmentsAsync(buildId, engineDir, modelType, cancellationToken); - - bool canceling = !await BuildJobService.StartBuildJobAsync( - BuildJobRunnerType.Hangfire, - EngineType.Statistical, - engineId, - buildId, - BuildStage.Postprocess, - buildOptions: buildOptions, - data: (trainCount, 0.0), - cancellationToken: cancellationToken - ); - if (canceling) - throw new OperationCanceledException(); - } - - protected override async Task CleanupAsync( - string engineId, - string buildId, - object? data, - JobCompletionStatus completionStatus - ) - { - if (completionStatus is JobCompletionStatus.Canceled) - { - try - { - await _sharedFileService.DeleteAsync($"builds/{buildId}/"); - } - catch (Exception e) - { - Logger.LogWarning(e, "Unable to to delete job data for build {BuildId}.", buildId); - } - } - } - - private async Task DownloadDataAsync(string buildId, string corpusDir, CancellationToken cancellationToken) - { - Directory.CreateDirectory(corpusDir); - await using Stream srcText = await _sharedFileService.OpenReadAsync( - $"builds/{buildId}/train.src.txt", - cancellationToken - ); - await using FileStream srcFileStream = File.Create(Path.Combine(corpusDir, "train.src.txt")); - await srcText.CopyToAsync(srcFileStream, cancellationToken); - - await using Stream tgtText = await _sharedFileService.OpenReadAsync( - $"builds/{buildId}/train.trg.txt", - cancellationToken - ); - await using FileStream tgtFileStream = File.Create(Path.Combine(corpusDir, "train.trg.txt")); - await tgtText.CopyToAsync(tgtFileStream, cancellationToken); - - await using Stream srcKeyTermsText = await _sharedFileService.OpenReadAsync( - $"builds/{buildId}/train.key-terms.src.txt", - cancellationToken - ); - await using FileStream srcKeyTermsFileStream = File.Create(Path.Combine(corpusDir, "train.key-terms.src.txt")); - await srcKeyTermsText.CopyToAsync(srcKeyTermsFileStream, cancellationToken); - - await using Stream tgtKeyTermsText = await _sharedFileService.OpenReadAsync( - $"builds/{buildId}/train.key-terms.trg.txt", - cancellationToken - ); - await using FileStream tgtKeyTermsFileStream = File.Create(Path.Combine(corpusDir, "train.key-terms.trg.txt")); - await tgtKeyTermsFileStream.CopyToAsync(tgtKeyTermsText, cancellationToken); - } - - private async Task TrainAsync( - string buildId, - string engineDir, - IParallelTextCorpus parallelCorpus, - string? modelType, - CancellationToken cancellationToken - ) - { - _wordAlignmentFactory.InitNew(engineDir); - LatinWordTokenizer tokenizer = new(); - using ITrainer wordAlignmentTrainer = _wordAlignmentFactory.CreateTrainer( - engineDir, - tokenizer, - parallelCorpus, - modelType - ); - cancellationToken.ThrowIfCancellationRequested(); - - var progress = new BuildProgress(PlatformService, buildId); - await wordAlignmentTrainer.TrainAsync(progress, cancellationToken); - - int trainCorpusSize = wordAlignmentTrainer.Stats.TrainCorpusSize; - - cancellationToken.ThrowIfCancellationRequested(); - - await wordAlignmentTrainer.SaveAsync(cancellationToken); - - await using Stream engineStream = await _sharedFileService.OpenWriteAsync( - $"builds/{buildId}/model.tar.gz", - cancellationToken - ); - await _wordAlignmentFactory.SaveEngineToAsync(engineDir, engineStream, cancellationToken); - return trainCorpusSize; - } - - private async Task GenerateWordAlignmentsAsync( - string buildId, - string engineDir, - string? modelType, - CancellationToken cancellationToken - ) - { - await using Stream sourceStream = await _sharedFileService.OpenReadAsync( - $"builds/{buildId}/word_alignments.inputs.json", - cancellationToken - ); - - IAsyncEnumerable wordAlignments = JsonSerializer - .DeserializeAsyncEnumerable(sourceStream, JsonSerializerOptions, cancellationToken) - .OfType(); - - await using Stream targetStream = await _sharedFileService.OpenWriteAsync( - $"builds/{buildId}/word_alignments.outputs.json", - cancellationToken - ); - await using Utf8JsonWriter targetWriter = new(targetStream, WordAlignmentWriterOptions); - - LatinWordTokenizer tokenizer = new(); - LatinWordDetokenizer detokenizer = new(); - using IWordAlignmentModel wordAlignmentModel = _wordAlignmentFactory.Create(engineDir, modelType); - await foreach (IReadOnlyList batch in BatchAsync(wordAlignments)) - { - (IReadOnlyList Source, IReadOnlyList Target)[] segments = batch - .Select(p => (p.SourceTokens, p.TargetTokens)) - .ToArray(); - IReadOnlyList results = wordAlignmentModel.AlignBatch(segments); - foreach ((Models.WordAlignment wordAlignment, WordAlignmentMatrix result) in batch.Zip(results)) - { - List alignedWordPairs = result.ToAlignedWordPairs().ToList(); - wordAlignmentModel.ComputeAlignedWordPairScores( - wordAlignment.SourceTokens, - wordAlignment.TargetTokens, - alignedWordPairs - ); - JsonSerializer.Serialize( - targetWriter, - wordAlignment with - { - Alignment = alignedWordPairs, - }, - JsonSerializerOptions - ); - } - } - } - - public static async IAsyncEnumerable> BatchAsync( - IAsyncEnumerable wordAlignments - ) - { - List batch = []; - await foreach (Models.WordAlignment item in wordAlignments) - { - batch.Add(item); - if (batch.Count == BatchSize) - { - yield return batch; - batch = []; - } - } - if (batch.Count > 0) - yield return batch; - } -} diff --git a/src/Machine/src/Serval.Machine.Shared/Services/StatsiticalClearMLBuildJobFactory.cs b/src/Machine/src/Serval.Machine.Shared/Services/StatsiticalClearMLBuildJobFactory.cs index 1e104b2cd..e93958575 100644 --- a/src/Machine/src/Serval.Machine.Shared/Services/StatsiticalClearMLBuildJobFactory.cs +++ b/src/Machine/src/Serval.Machine.Shared/Services/StatsiticalClearMLBuildJobFactory.cs @@ -15,7 +15,6 @@ public async Task CreateJobScriptAsync( string buildId, string modelType, BuildStage stage, - object? data = null, string? buildOptions = null, CancellationToken cancellationToken = default ) diff --git a/src/Machine/src/Serval.Machine.Shared/Services/ThotWordAlignmentModelFactory.cs b/src/Machine/src/Serval.Machine.Shared/Services/ThotWordAlignmentModelFactory.cs index 9087c0b17..bc848278a 100644 --- a/src/Machine/src/Serval.Machine.Shared/Services/ThotWordAlignmentModelFactory.cs +++ b/src/Machine/src/Serval.Machine.Shared/Services/ThotWordAlignmentModelFactory.cs @@ -20,6 +20,12 @@ public IWordAlignmentModel Create(string engineDir, string? modelTypeStr = null) return new SymmetrizedWordAlignmentModel(directModel, inverseModel); } + public override ITrainer CreateTrainer( + string engineDir, + IRangeTokenizer tokenizer, + IParallelTextCorpus corpus + ) => CreateTrainer(engineDir, tokenizer, corpus); + public ITrainer CreateTrainer( string engineDir, ITokenizer tokenizer, diff --git a/src/Machine/src/Serval.Machine.Shared/Usings.cs b/src/Machine/src/Serval.Machine.Shared/Usings.cs index 3de67a4f3..eaab3c5dd 100644 --- a/src/Machine/src/Serval.Machine.Shared/Usings.cs +++ b/src/Machine/src/Serval.Machine.Shared/Usings.cs @@ -19,13 +19,11 @@ global using System.Text.Json.Serialization; global using System.Text.RegularExpressions; global using System.Text.Unicode; +global using System.Threading.Channels; global using Amazon; global using Amazon.Runtime; global using Amazon.S3; global using Amazon.S3.Model; -global using Hangfire; -global using Hangfire.Common; -global using Hangfire.States; global using Microsoft.Extensions.Caching.Memory; global using Microsoft.Extensions.Configuration; global using Microsoft.Extensions.DependencyInjection; diff --git a/src/Machine/test/Serval.Machine.Shared.Tests/Serval.Machine.Shared.Tests.csproj b/src/Machine/test/Serval.Machine.Shared.Tests/Serval.Machine.Shared.Tests.csproj index 40276586d..928b91d7c 100644 --- a/src/Machine/test/Serval.Machine.Shared.Tests/Serval.Machine.Shared.Tests.csproj +++ b/src/Machine/test/Serval.Machine.Shared.Tests/Serval.Machine.Shared.Tests.csproj @@ -18,8 +18,7 @@ all - - + runtime; build; native; contentfiles; analyzers; buildtransitive all diff --git a/src/Machine/test/Serval.Machine.Shared.Tests/Services/ClearMLMonitorServiceTests.cs b/src/Machine/test/Serval.Machine.Shared.Tests/Services/ClearMLMonitorServiceTests.cs index 06b0d7bee..558715f61 100644 --- a/src/Machine/test/Serval.Machine.Shared.Tests/Services/ClearMLMonitorServiceTests.cs +++ b/src/Machine/test/Serval.Machine.Shared.Tests/Services/ClearMLMonitorServiceTests.cs @@ -1,5 +1,3 @@ -using Microsoft.Extensions.DependencyInjection; - namespace Serval.Machine.Shared.Services; [TestFixture] @@ -269,7 +267,7 @@ public async Task MonitorClearMLTasksPerDomain_CompletedStatus_ProperlyHandlesCo _translationBuildJobService .StartBuildJobAsync( - BuildJobRunnerType.Hangfire, + BuildJobRunnerType.Local, engine.Type, engine.EngineId, engine.CurrentBuild.BuildId, @@ -296,7 +294,7 @@ await VerifyStatusUpdate( await _translationBuildJobService .Received(1) .StartBuildJobAsync( - BuildJobRunnerType.Hangfire, + BuildJobRunnerType.Local, engine.Type, engine.EngineId, engine.CurrentBuild.BuildId, diff --git a/src/Machine/test/Serval.Machine.Shared.Tests/Services/NmtEngineServiceTests.cs b/src/Machine/test/Serval.Machine.Shared.Tests/Services/NmtEngineServiceTests.cs index a88910861..1a087da61 100644 --- a/src/Machine/test/Serval.Machine.Shared.Tests/Services/NmtEngineServiceTests.cs +++ b/src/Machine/test/Serval.Machine.Shared.Tests/Services/NmtEngineServiceTests.cs @@ -1,4 +1,4 @@ -using Serval.Translation.Contracts; +using Serval.Translation.Contracts; namespace Serval.Machine.Shared.Services; @@ -91,10 +91,9 @@ public async Task GetLanguageInfoAsync() private class TestEnvironment : DisposableBase { - private readonly Hangfire.InMemory.InMemoryStorage _memoryStorage; - private readonly BackgroundJobClient _jobClient; - private BackgroundJobServer _jobServer; - private readonly IDistributedReaderWriterLockFactory _lockFactory; + private readonly LocalBuildJobRunner _jobRunner; + private readonly CancellationTokenSource _runnerCts = new(); + private readonly ServiceProvider _serviceProvider; private readonly CancellationTokenSource _cancellationTokenSource = new(); private Func _trainJobFunc; private Task? _trainJobTask; @@ -118,17 +117,9 @@ public TestEnvironment() IsModelPersisted = false, } ); - _memoryStorage = new Hangfire.InMemory.InMemoryStorage(); - _jobClient = new BackgroundJobClient(_memoryStorage); PlatformService = Substitute.For(); PlatformService.EngineGroup.Returns(EngineGroup.Translation); TranslationPlatformService = Substitute.For(); - _lockFactory = new DistributedReaderWriterLockFactory( - new OptionsWrapper(new ServiceOptions { ServiceId = "host" }), - new OptionsWrapper(new DistributedReaderWriterLockOptions()), - new MemoryRepository(), - new ObjectIdGenerator() - ); ClearMLService = Substitute.For(); ClearMLService .GetProjectIdAsync("engine1", Arg.Any()) @@ -172,9 +163,42 @@ public TestEnvironment() ], } ); + + IBuildJobService? deferredBuildJobService = null; + var services = new ServiceCollection(); + services.AddScoped(_ => deferredBuildJobService!); + services.AddSingleton(Substitute.For>()); + services.AddKeyedSingleton(EngineGroup.Translation, (_, _) => PlatformService); + services.AddKeyedSingleton(EngineGroup.WordAlignment, (_, _) => Substitute.For()); + services.AddSingleton>(Engines); + services.AddSingleton>(new MemoryRepository()); + services.AddScoped(_ => new MemoryDataAccessContext()); + services.AddSingleton(SharedFileService); + services.AddSingleton(new LanguageTagService()); + services.AddSingleton(Substitute.For()); + services.AddSingleton(BuildJobOptions); + services.AddLogging(); + _serviceProvider = services.BuildServiceProvider(); + + _jobRunner = new LocalBuildJobRunner( + [new NmtLocalBuildJobFactory()], + _serviceProvider.GetRequiredService(), + _serviceProvider.GetRequiredService>() + ); + + var clearMLOptions = Substitute.For>(); + clearMLOptions.CurrentValue.Returns(new ClearMLOptions()); + ClearMLQueueService = new ClearMLMonitorService( + Substitute.For(), + ClearMLService, + SharedFileService, + clearMLOptions, + BuildJobOptions, + Substitute.For>() + ); BuildJobService = new BuildJobService( [ - new HangfireBuildJobRunner(_jobClient, [new NmtHangfireBuildJobFactory()]), + _jobRunner, new ClearMLBuildJobRunner( ClearMLService, [ @@ -189,17 +213,8 @@ public TestEnvironment() ], Engines ); - var clearMLOptions = Substitute.For>(); - clearMLOptions.CurrentValue.Returns(new ClearMLOptions()); - ClearMLQueueService = new ClearMLMonitorService( - Substitute.For(), - ClearMLService, - SharedFileService, - clearMLOptions, - BuildJobOptions, - Substitute.For>() - ); - _jobServer = CreateJobServer(); + deferredBuildJobService = BuildJobService; + _ = _jobRunner.StartAsync(_runnerCts.Token); Service = CreateService(); } @@ -218,28 +233,6 @@ public void PersistModel() Engines.Replace(Engines.Get("engine1") with { IsModelPersisted = true }); } - public void StopServer() - { - _jobServer.Dispose(); - } - - public void StartServer() - { - _jobServer = CreateJobServer(); - Service = CreateService(); - } - - private BackgroundJobServer CreateJobServer() - { - var jobServerOptions = new BackgroundJobServerOptions - { - Activator = new EnvActivator(this), - Queues = new[] { BuildJobQueues.Nmt }, - CancellationCheckInterval = TimeSpan.FromMilliseconds(50), - }; - return new BackgroundJobServer(jobServerOptions, _memoryStorage); - } - private NmtEngineService CreateService() { return new NmtEngineService( @@ -292,7 +285,7 @@ private async Task RunNormalTrainJob() await using Stream stream = await SharedFileService.OpenWriteAsync("builds/build1/pretranslate.trg.json"); await BuildJobService.StartBuildJobAsync( - BuildJobRunnerType.Hangfire, + BuildJobRunnerType.Local, EngineType.Nmt, "engine1", "build1", @@ -313,46 +306,10 @@ private async Task RunInfiniteTrainJob() protected override void DisposeManagedResources() { - _jobServer.Dispose(); + _runnerCts.Cancel(); + _serviceProvider.Dispose(); _cancellationTokenSource.Dispose(); - } - - private class EnvActivator(TestEnvironment env) : JobActivator - { - private readonly TestEnvironment _env = env; - - public override object ActivateJob(Type jobType) - { - if (jobType == typeof(NmtPreprocessBuildJob)) - { - return new NmtPreprocessBuildJob( - _env.PlatformService, - _env.Engines, - new MemoryDataAccessContext(), - Substitute.For>(), - _env.BuildJobService, - _env.SharedFileService, - new LanguageTagService(), - Substitute.For(), - _env.BuildJobOptions - ); - } - if (jobType == typeof(TranslationPostprocessBuildJob)) - { - var buildJobOptions = Substitute.For>(); - buildJobOptions.CurrentValue.Returns(new BuildJobOptions()); - return new TranslationPostprocessBuildJob( - _env.PlatformService, - _env.Engines, - new MemoryDataAccessContext(), - _env.BuildJobService, - Substitute.For>(), - _env.SharedFileService, - _env.BuildJobOptions - ); - } - return base.ActivateJob(jobType); - } + _runnerCts.Dispose(); } } } diff --git a/src/Machine/test/Serval.Machine.Shared.Tests/Services/PreprocessBuildJobTests.cs b/src/Machine/test/Serval.Machine.Shared.Tests/Services/PreprocessBuildJobTests.cs index 50c802717..c5dc07d6e 100644 --- a/src/Machine/test/Serval.Machine.Shared.Tests/Services/PreprocessBuildJobTests.cs +++ b/src/Machine/test/Serval.Machine.Shared.Tests/Services/PreprocessBuildJobTests.cs @@ -130,7 +130,7 @@ public TestEnvironment() BuildId = "build1", JobId = "job1", JobState = BuildJobState.Pending, - BuildJobRunner = BuildJobRunnerType.Hangfire, + BuildJobRunner = BuildJobRunnerType.Local, Stage = BuildStage.Preprocess, ExecutionData = new BuildExecutionData(), }, @@ -151,7 +151,7 @@ public TestEnvironment() BuildId = "build1", JobId = "job1", JobState = BuildJobState.Pending, - BuildJobRunner = BuildJobRunnerType.Hangfire, + BuildJobRunner = BuildJobRunnerType.Local, Stage = BuildStage.Preprocess, ExecutionData = new BuildExecutionData(), }, @@ -172,7 +172,7 @@ public TestEnvironment() BuildId = "build1", JobId = "job1", JobState = BuildJobState.Pending, - BuildJobRunner = BuildJobRunnerType.Hangfire, + BuildJobRunner = BuildJobRunnerType.Local, Stage = BuildStage.Preprocess, ExecutionData = new BuildExecutionData(), }, @@ -193,7 +193,7 @@ public TestEnvironment() BuildId = "build1", JobId = "job1", JobState = BuildJobState.Pending, - BuildJobRunner = BuildJobRunnerType.Hangfire, + BuildJobRunner = BuildJobRunnerType.Local, Stage = BuildStage.Preprocess, ExecutionData = new BuildExecutionData(), }, @@ -259,9 +259,10 @@ public TestEnvironment() SharedFileService = new SharedFileService(Substitute.For()); BuildJobService = new BuildJobService( [ - new HangfireBuildJobRunner( - Substitute.For(), - [new NmtHangfireBuildJobFactory(), new SmtTransferHangfireBuildJobFactory()] + new LocalBuildJobRunner( + [new NmtLocalBuildJobFactory(), new SmtTransferLocalBuildJobFactory()], + Substitute.For(), + Substitute.For>() ), new ClearMLBuildJobRunner( ClearMLService, diff --git a/src/Machine/test/Serval.Machine.Shared.Tests/Services/SmtTransferEngineServiceTests.cs b/src/Machine/test/Serval.Machine.Shared.Tests/Services/SmtTransferEngineServiceTests.cs index 4fd088aee..b6ba9903f 100644 --- a/src/Machine/test/Serval.Machine.Shared.Tests/Services/SmtTransferEngineServiceTests.cs +++ b/src/Machine/test/Serval.Machine.Shared.Tests/Services/SmtTransferEngineServiceTests.cs @@ -1,4 +1,4 @@ -using Serval.Translation.Contracts; +using Serval.Translation.Contracts; namespace Serval.Machine.Shared.Services; @@ -28,11 +28,9 @@ public async Task CreateAsync() env.TransferEngineFactory.Received().InitNew(engineDir); } - [TestCase(BuildJobRunnerType.Hangfire)] - [TestCase(BuildJobRunnerType.ClearML)] - public async Task StartBuildAsync(BuildJobRunnerType trainJobRunnerType) + public async Task StartBuildAsync() { - using var env = new TestEnvironment(trainJobRunnerType); + using var env = new TestEnvironment(); TranslationEngine engine = env.Engines.Get(EngineId1); Assert.That(engine.BuildRevision, Is.EqualTo(1)); // ensure that the SMT model was loaded before training @@ -88,11 +86,9 @@ await env.Service.StartBuildAsync( _ = env.Truecaser.DidNotReceive().SaveAsync(); } - [TestCase(BuildJobRunnerType.Hangfire)] - [TestCase(BuildJobRunnerType.ClearML)] - public async Task CancelBuildAsync_Building(BuildJobRunnerType trainJobRunnerType) + public async Task CancelBuildAsync_Building() { - using var env = new TestEnvironment(trainJobRunnerType); + using var env = new TestEnvironment(); env.UseInfiniteTrainJob(); await env.Service.StartBuildAsync(EngineId1, BuildId1, Array.Empty(), "{}"); @@ -115,35 +111,9 @@ public async Task CancelBuildAsync_NotBuilding() Assert.That(await env.Service.CancelBuildAsync(EngineId1), Is.Null); } - [Test] - public async Task StartBuildAsync_RestartUnfinishedBuild() - { - using var env = new TestEnvironment(BuildJobRunnerType.Hangfire); - env.UseInfiniteTrainJob(); - - await env.Service.StartBuildAsync(EngineId1, BuildId1, Array.Empty(), "{}"); - await env.WaitForTrainingToStartAsync(); - TranslationEngine engine = env.Engines.Get(EngineId1); - Assert.That(engine.CurrentBuild, Is.Not.Null); - Assert.That(engine.CurrentBuild!.JobState, Is.EqualTo(BuildJobState.Active)); - env.StopServer(); - await env.WaitForBuildToRestartAsync(); - engine = env.Engines.Get(EngineId1); - Assert.That(engine.CurrentBuild, Is.Not.Null); - Assert.That(engine.CurrentBuild!.JobState, Is.EqualTo(BuildJobState.Pending)); - _ = env.PlatformService.Received().BuildRestartingAsync(BuildId1); - env.SmtBatchTrainer.ClearSubstitute(ClearOptions.CallActions); - env.StartServer(); - await env.WaitForBuildToFinishAsync(); - engine = env.Engines.Get(EngineId1); - Assert.That(engine.CurrentBuild, Is.Null); - } - - [TestCase(BuildJobRunnerType.Hangfire)] - [TestCase(BuildJobRunnerType.ClearML)] - public async Task DeleteAsync_WhileBuilding(BuildJobRunnerType trainJobRunnerType) + public async Task DeleteAsync_WhileBuilding() { - using var env = new TestEnvironment(trainJobRunnerType); + using var env = new TestEnvironment(); env.UseInfiniteTrainJob(); await env.Service.StartBuildAsync(EngineId1, BuildId1, Array.Empty(), "{}"); @@ -153,7 +123,6 @@ public async Task DeleteAsync_WhileBuilding(BuildJobRunnerType trainJobRunnerTyp Assert.That(engine.CurrentBuild!.JobState, Is.EqualTo(BuildJobState.Active)); await env.Service.DeleteAsync(EngineId1); await env.WaitForBuildToFinishAsync(); - await env.WaitForAllHangfireJobsToFinishAsync(); _ = env.SmtBatchTrainer.DidNotReceive().SaveAsync(); _ = env.TruecaserTrainer.DidNotReceive().SaveAsync(); Assert.That(env.Engines.Contains(EngineId1), Is.False); @@ -169,11 +138,9 @@ public async Task UpdateAsync() Assert.That(engine.TargetLanguage, Is.EqualTo("en")); } - [TestCase(BuildJobRunnerType.Hangfire)] - [TestCase(BuildJobRunnerType.ClearML)] - public async Task TrainSegmentPairAsync(BuildJobRunnerType trainJobRunnerType) + public async Task TrainSegmentPairAsync() { - using var env = new TestEnvironment(trainJobRunnerType); + using var env = new TestEnvironment(); env.UseInfiniteTrainJob(); await env.Service.StartBuildAsync(EngineId1, BuildId1, Array.Empty(), "{}"); @@ -242,19 +209,21 @@ public async Task GetLanguageInfoAsync() private class TestEnvironment : DisposableBase { - private readonly Hangfire.InMemory.InMemoryStorage _memoryStorage; - private readonly BackgroundJobClient _jobClient; - private BackgroundJobServer _jobServer; - private readonly ITruecaserFactory _truecaserFactory; private readonly IDistributedReaderWriterLockFactory _lockFactory; private readonly BuildJobRunnerType _trainJobRunnerType; + private readonly ClearMLBuildJobRunner _clearMLRunner; + private readonly ITruecaserFactory _truecaserFactory; + private readonly ServiceProvider _serviceProvider; + private readonly IBuildJobService? _deferredBuildJobService; + private readonly LocalBuildJobRunner _jobRunner; + private readonly CancellationTokenSource _runnerCts = new(); private Task? _trainJobTask; private readonly CancellationTokenSource _cancellationTokenSource = new(); private bool _training = true; - public TestEnvironment(BuildJobRunnerType trainJobRunnerType = BuildJobRunnerType.ClearML) + public TestEnvironment() { - _trainJobRunnerType = trainJobRunnerType; + _trainJobRunnerType = BuildJobRunnerType.ClearML; Engines = new MemoryRepository(); Engines.Add( new TranslationEngine @@ -269,8 +238,6 @@ public TestEnvironment(BuildJobRunnerType trainJobRunnerType = BuildJobRunnerTyp } ); TrainSegmentPairs = new MemoryRepository(); - _memoryStorage = new Hangfire.InMemory.InMemoryStorage(); - _jobClient = new BackgroundJobClient(_memoryStorage); PlatformService = Substitute.For(); PlatformService.EngineGroup.Returns(EngineGroup.Translation); SmtModel = Substitute.For(); @@ -343,20 +310,42 @@ public TestEnvironment(BuildJobRunnerType trainJobRunnerType = BuildJobRunnerTyp BuildJobOptions, Substitute.For>() ); - BuildJobService = new BuildJobService( - [ - new HangfireBuildJobRunner(_jobClient, [new SmtTransferHangfireBuildJobFactory()]), - new ClearMLBuildJobRunner( - ClearMLService, - [new SmtTransferClearMLBuildJobFactory(SharedFileService, Engines)], - BuildJobOptions - ), - ], - Engines + + _clearMLRunner = new ClearMLBuildJobRunner( + ClearMLService, + [new SmtTransferClearMLBuildJobFactory(SharedFileService, Engines)], + BuildJobOptions ); - _jobServer = CreateJobServer(); + + var smtEngineOptions = Substitute.For>(); + smtEngineOptions.CurrentValue.Returns(new SmtTransferEngineOptions()); + + var services = new ServiceCollection(); + services.AddScoped(_ => _deferredBuildJobService!); + services.AddSingleton(Substitute.For>()); + services.AddKeyedSingleton(EngineGroup.Translation, (_, _) => PlatformService); + services.AddKeyedSingleton(EngineGroup.WordAlignment, (_, _) => Substitute.For()); + services.AddSingleton>(Engines); + services.AddSingleton>(new MemoryRepository()); + services.AddSingleton>(TrainSegmentPairs); + services.AddScoped(_ => new MemoryDataAccessContext()); + services.AddSingleton(SharedFileService); + services.AddSingleton(_lockFactory); + services.AddSingleton(Substitute.For()); + services.AddSingleton(BuildJobOptions); + services.AddSingleton(_truecaserFactory); + services.AddSingleton(SmtModelFactory); + services.AddSingleton(TransferEngineFactory); + services.AddSingleton(smtEngineOptions); + services.AddLogging(); + _serviceProvider = services.BuildServiceProvider(); + + _jobRunner = CreateJobRunner(); + BuildJobService = CreateBuildJobService(); + _deferredBuildJobService = BuildJobService; StateService = CreateStateService(); Service = CreateService(); + _ = _jobRunner.StartAsync(_runnerCts.Token); } public SmtTransferEngineService Service { get; private set; } @@ -377,26 +366,13 @@ [new SmtTransferClearMLBuildJobFactory(SharedFileService, Engines)], public ISharedFileService SharedFileService { get; } - public IBuildJobService BuildJobService { get; } + public IBuildJobService BuildJobService { get; private set; } public async Task CommitAsync(TimeSpan inactiveTimeout) { await StateService.CommitAsync(_lockFactory, Engines, inactiveTimeout); } - public void StopServer() - { - _jobServer.Dispose(); - StateService.Dispose(); - } - - public void StartServer() - { - _jobServer = CreateJobServer(); - StateService = CreateStateService(); - Service = CreateService(); - } - public void UseInfiniteTrainJob() { SmtBatchTrainer.TrainAsync( @@ -417,15 +393,18 @@ public void StopTraining() _training = false; } - private BackgroundJobServer CreateJobServer() + private LocalBuildJobRunner CreateJobRunner() { - var jobServerOptions = new BackgroundJobServerOptions - { - Activator = new EnvActivator(this), - Queues = new[] { BuildJobQueues.SmtTransfer }, - CancellationCheckInterval = TimeSpan.FromMilliseconds(50), - }; - return new BackgroundJobServer(jobServerOptions, _memoryStorage); + return new LocalBuildJobRunner( + [new SmtTransferTestLocalBuildJobFactory(_trainJobRunnerType)], + _serviceProvider.GetRequiredService(), + _serviceProvider.GetRequiredService>() + ); + } + + private IBuildJobService CreateBuildJobService() + { + return new BuildJobService([_jobRunner, _clearMLRunner], Engines); } private SmtTransferEngineStateService CreateStateService() @@ -602,13 +581,6 @@ private static TranslationSources[] GetSources(int count, bool isUnknown) return sources; } - public async Task WaitForAllHangfireJobsToFinishAsync() - { - IMonitoringApi monitoringApi = _memoryStorage.GetMonitoringApi(); - while (monitoringApi.EnqueuedCount(BuildJobQueues.SmtTransfer) > 0 || monitoringApi.ProcessingCount() > 0) - await Task.Delay(50); - } - public async Task WaitForBuildToFinishAsync() { await WaitForBuildState(e => e.CurrentBuild is null); @@ -650,7 +622,10 @@ private async Task WaitForBuildState(Func predicate) protected override void DisposeManagedResources() { StateService.Dispose(); - _jobServer.Dispose(); + _runnerCts.Cancel(); + _serviceProvider.Dispose(); + _cancellationTokenSource.Dispose(); + _runnerCts.Dispose(); } private async Task RunTrainJob() @@ -684,7 +659,7 @@ private async Task RunTrainJob() ); await BuildJobService.StartBuildJobAsync( - BuildJobRunnerType.Hangfire, + BuildJobRunnerType.Local, EngineType.SmtTransfer, EngineId1, BuildId1, @@ -698,64 +673,53 @@ await BuildJobService.StartBuildJobAsync( } } - private class EnvActivator(TestEnvironment env) : JobActivator + private class SmtTransferTestLocalBuildJobFactory(BuildJobRunnerType trainJobRunnerType) : ILocalBuildJobFactory { - private readonly TestEnvironment _env = env; + private static readonly JsonSerializerOptions SerializerOptions = new() + { + PropertyNamingPolicy = JsonNamingPolicy.CamelCase, + }; + + public EngineType EngineType => EngineType.SmtTransfer; + + public string? Serialize(BuildStage stage, object? data) => + new SmtTransferLocalBuildJobFactory().Serialize(stage, data); - public override object ActivateJob(Type jobType) + public async Task RunAsync( + IServiceProvider serviceProvider, + string engineId, + string buildId, + BuildStage stage, + string? jobData, + string? buildOptions, + CancellationToken cancellationToken + ) { - if (jobType == typeof(SmtTransferPreprocessBuildJob)) - { - return new SmtTransferPreprocessBuildJob( - _env.PlatformService, - _env.Engines, - new MemoryDataAccessContext(), - Substitute.For>(), - _env.BuildJobService, - _env.SharedFileService, - _env._lockFactory, - _env.TrainSegmentPairs, - Substitute.For(), - _env.BuildJobOptions - ) - { - TrainJobRunnerType = _env._trainJobRunnerType, - }; - } - if (jobType == typeof(SmtTransferPostprocessBuildJob)) - { - var engineOptions = Substitute.For>(); - engineOptions.CurrentValue.Returns(new SmtTransferEngineOptions()); - return new SmtTransferPostprocessBuildJob( - _env.PlatformService, - _env.Engines, - new MemoryDataAccessContext(), - _env.BuildJobService, - Substitute.For>(), - _env.SharedFileService, - _env._lockFactory, - _env.TrainSegmentPairs, - _env.SmtModelFactory, - _env._truecaserFactory, - _env.BuildJobOptions, - engineOptions - ); - } - if (jobType == typeof(SmtTransferTrainBuildJob)) + switch (stage) { - return new SmtTransferTrainBuildJob( - _env.PlatformService, - _env.Engines, - new MemoryDataAccessContext(), - _env.BuildJobService, - Substitute.For>(), - _env.SharedFileService, - _env._truecaserFactory, - _env.SmtModelFactory, - _env.TransferEngineFactory - ); + case BuildStage.Preprocess: + var preprocessJob = ActivatorUtilities.CreateInstance( + serviceProvider + ); + preprocessJob.TrainJobRunnerType = trainJobRunnerType; + var corpora = JsonSerializer.Deserialize>( + jobData!, + SerializerOptions + )!; + await preprocessJob.RunAsync(engineId, buildId, corpora, buildOptions, cancellationToken); + break; + default: + await new SmtTransferLocalBuildJobFactory().RunAsync( + serviceProvider, + engineId, + buildId, + stage, + jobData, + buildOptions, + cancellationToken + ); + break; } - return base.ActivateJob(jobType); } } } diff --git a/src/Machine/test/Serval.Machine.Shared.Tests/Services/StatisticalEngineServiceTests.cs b/src/Machine/test/Serval.Machine.Shared.Tests/Services/StatisticalEngineServiceTests.cs index c6d6ea0d5..92099c77d 100644 --- a/src/Machine/test/Serval.Machine.Shared.Tests/Services/StatisticalEngineServiceTests.cs +++ b/src/Machine/test/Serval.Machine.Shared.Tests/Services/StatisticalEngineServiceTests.cs @@ -26,11 +26,9 @@ public async Task CreateAsync() env.WordAlignmentModelFactory.Received().InitNew(engineDir); } - [TestCase(BuildJobRunnerType.Hangfire)] - [TestCase(BuildJobRunnerType.ClearML)] - public async Task StartBuildAsync(BuildJobRunnerType trainJobRunnerType) + public async Task StartBuildAsync() { - using var env = new TestEnvironment(trainJobRunnerType); + using var env = new TestEnvironment(); WordAlignmentEngine engine = env.Engines.Get(EngineId1); Assert.That(engine.BuildRevision, Is.EqualTo(1)); // ensure that the model was loaded before training @@ -80,11 +78,9 @@ await env.Service.StartBuildAsync( env.WordAlignmentModel.Received().Dispose(); } - [TestCase(BuildJobRunnerType.Hangfire)] - [TestCase(BuildJobRunnerType.ClearML)] - public async Task CancelBuildAsync_Building(BuildJobRunnerType trainJobRunnerType) + public async Task CancelBuildAsync_Building() { - using var env = new TestEnvironment(trainJobRunnerType); + using var env = new TestEnvironment(); env.UseInfiniteTrainJob(); await env.Service.StartBuildAsync(EngineId1, BuildId1, Array.Empty(), "{}"); @@ -99,19 +95,16 @@ public async Task CancelBuildAsync_Building(BuildJobRunnerType trainJobRunnerTyp Assert.That(engine.CurrentBuild, Is.Null); } - [TestCase(BuildJobRunnerType.Hangfire)] - [TestCase(BuildJobRunnerType.ClearML)] - public async Task CancelBuildAsync_NotBuilding(BuildJobRunnerType trainJobRunnerType) + [Test] + public async Task CancelBuildAsync_NotBuilding() { - using var env = new TestEnvironment(trainJobRunnerType); + using var env = new TestEnvironment(); Assert.That(await env.Service.CancelBuildAsync(EngineId1), Is.Null); } - [TestCase(BuildJobRunnerType.Hangfire)] - [TestCase(BuildJobRunnerType.ClearML)] - public async Task DeleteAsync_WhileBuilding(BuildJobRunnerType trainJobRunnerType) + public async Task DeleteAsync_WhileBuilding() { - using var env = new TestEnvironment(trainJobRunnerType); + using var env = new TestEnvironment(); env.UseInfiniteTrainJob(); await env.Service.StartBuildAsync(EngineId1, BuildId1, Array.Empty(), "{}"); @@ -121,7 +114,6 @@ public async Task DeleteAsync_WhileBuilding(BuildJobRunnerType trainJobRunnerTyp Assert.That(engine.CurrentBuild!.JobState, Is.EqualTo(BuildJobState.Active)); await env.Service.DeleteAsync(EngineId1); await env.WaitForBuildToFinishAsync(); - await env.WaitForAllHangfireJobsToFinishAsync(); _ = env.WordAlignmentBatchTrainer.DidNotReceive().SaveAsync(); Assert.That(env.Engines.Contains(EngineId1), Is.False); } @@ -142,18 +134,19 @@ public async Task AlignAsync() private class TestEnvironment : DisposableBase { - private readonly Hangfire.InMemory.InMemoryStorage _memoryStorage; - private readonly BackgroundJobClient _jobClient; - private BackgroundJobServer _jobServer; private readonly IDistributedReaderWriterLockFactory _lockFactory; private readonly BuildJobRunnerType _trainJobRunnerType; + private readonly ClearMLBuildJobRunner _clearMLRunner; + private readonly ServiceProvider _serviceProvider; + private readonly IBuildJobService? _deferredBuildJobService; + private readonly LocalBuildJobRunner _jobRunner; + private readonly CancellationTokenSource _runnerCts = new(); private Task? _trainJobTask; private readonly CancellationTokenSource _cancellationTokenSource = new(); - private bool _training = true; - public TestEnvironment(BuildJobRunnerType trainJobRunnerType = BuildJobRunnerType.ClearML) + public TestEnvironment() { - _trainJobRunnerType = trainJobRunnerType; + _trainJobRunnerType = BuildJobRunnerType.ClearML; Engines = new MemoryRepository(); Engines.Add( new WordAlignmentEngine @@ -166,8 +159,6 @@ public TestEnvironment(BuildJobRunnerType trainJobRunnerType = BuildJobRunnerTyp BuildRevision = 1, } ); - _memoryStorage = new Hangfire.InMemory.InMemoryStorage(); - _jobClient = new BackgroundJobClient(_memoryStorage); PlatformService = Substitute.For(); PlatformService.EngineGroup.Returns(EngineGroup.WordAlignment); WordAlignmentModel = Substitute.For(); @@ -226,20 +217,39 @@ public TestEnvironment(BuildJobRunnerType trainJobRunnerType = BuildJobRunnerTyp BuildJobOptions, Substitute.For>() ); - BuildJobService = new BuildJobService( - [ - new HangfireBuildJobRunner(_jobClient, [new StatisticalHangfireBuildJobFactory()]), - new ClearMLBuildJobRunner( - ClearMLService, - [new StatisticalClearMLBuildJobFactory(SharedFileService, Engines)], - BuildJobOptions - ), - ], - Engines + + _clearMLRunner = new ClearMLBuildJobRunner( + ClearMLService, + [new StatisticalClearMLBuildJobFactory(SharedFileService, Engines)], + BuildJobOptions ); - _jobServer = CreateJobServer(); + + var statisticalEngineOptions = Substitute.For>(); + statisticalEngineOptions.CurrentValue.Returns(new StatisticalEngineOptions()); + + var services = new ServiceCollection(); + services.AddScoped(_ => _deferredBuildJobService!); + services.AddSingleton(Substitute.For>()); + services.AddKeyedSingleton(EngineGroup.WordAlignment, (_, _) => PlatformService); + services.AddKeyedSingleton(EngineGroup.Translation, (_, _) => Substitute.For()); + services.AddSingleton>(new MemoryRepository()); + services.AddSingleton>(Engines); + services.AddScoped(_ => new MemoryDataAccessContext()); + services.AddSingleton(SharedFileService); + services.AddSingleton(_lockFactory); + services.AddSingleton(Substitute.For()); + services.AddSingleton(BuildJobOptions); + services.AddSingleton(WordAlignmentModelFactory); + services.AddSingleton(statisticalEngineOptions); + services.AddLogging(); + _serviceProvider = services.BuildServiceProvider(); + + _jobRunner = CreateJobRunner(); + BuildJobService = CreateBuildJobService(); + _deferredBuildJobService = BuildJobService; StateService = CreateStateService(); Service = CreateService(); + _ = _jobRunner.StartAsync(_runnerCts.Token); } public StatisticalEngineService Service { get; private set; } @@ -249,13 +259,10 @@ [new StatisticalClearMLBuildJobFactory(SharedFileService, Engines)], public ITrainer WordAlignmentBatchTrainer { get; } public IWordAlignmentModel WordAlignmentModel { get; } public IPlatformService PlatformService { get; } - public IClearMLService ClearMLService { get; } public IClearMLQueueService ClearMLMonitorService { get; } - public ISharedFileService SharedFileService { get; } - - public IBuildJobService BuildJobService { get; } + public IBuildJobService BuildJobService { get; private set; } public IOptionsMonitor BuildJobOptions { get; } public async Task CommitAsync(TimeSpan inactiveTimeout) @@ -263,26 +270,13 @@ public async Task CommitAsync(TimeSpan inactiveTimeout) await StateService.CommitAsync(_lockFactory, Engines, inactiveTimeout); } - public void StopServer() - { - _jobServer.Dispose(); - StateService.Dispose(); - } - - public void StartServer() - { - _jobServer = CreateJobServer(); - StateService = CreateStateService(); - Service = CreateService(); - } - public void UseInfiniteTrainJob() { WordAlignmentBatchTrainer.TrainAsync( Arg.Any>(), Arg.Do(cancellationToken => { - while (_training) + while (true) { cancellationToken.ThrowIfCancellationRequested(); Thread.Sleep(100); @@ -291,20 +285,18 @@ public void UseInfiniteTrainJob() ); } - public void StopTraining() + private LocalBuildJobRunner CreateJobRunner() { - _training = false; + return new LocalBuildJobRunner( + [new StatisticalTestLocalBuildJobFactory(_trainJobRunnerType)], + _serviceProvider.GetRequiredService(), + _serviceProvider.GetRequiredService>() + ); } - private BackgroundJobServer CreateJobServer() + private IBuildJobService CreateBuildJobService() { - var jobServerOptions = new BackgroundJobServerOptions - { - Activator = new EnvActivator(this), - Queues = new[] { BuildJobQueues.Statistical }, - CancellationCheckInterval = TimeSpan.FromMilliseconds(50), - }; - return new BackgroundJobServer(jobServerOptions, _memoryStorage); + return new BuildJobService([_jobRunner, _clearMLRunner], Engines); } private StatisticalEngineStateService CreateStateService() @@ -349,13 +341,6 @@ private IWordAlignmentModelFactory CreateWordAlignmentModelFactory() return factory; } - public async Task WaitForAllHangfireJobsToFinishAsync() - { - IMonitoringApi monitoringApi = _memoryStorage.GetMonitoringApi(); - while (monitoringApi.EnqueuedCount(BuildJobQueues.Statistical) > 0 || monitoringApi.ProcessingCount() > 0) - await Task.Delay(50); - } - public async Task WaitForBuildToFinishAsync() { await WaitForBuildState(e => e.CurrentBuild is null); @@ -397,7 +382,10 @@ private async Task WaitForBuildState(Func predicate) protected override void DisposeManagedResources() { StateService.Dispose(); - _jobServer.Dispose(); + _runnerCts.Cancel(); + _serviceProvider.Dispose(); + _cancellationTokenSource.Dispose(); + _runnerCts.Dispose(); } private async Task RunTrainJob() @@ -430,7 +418,7 @@ private async Task RunTrainJob() ); await BuildJobService.StartBuildJobAsync( - BuildJobRunnerType.Hangfire, + BuildJobRunnerType.Local, EngineType.Statistical, EngineId1, BuildId1, @@ -444,58 +432,53 @@ await BuildJobService.StartBuildJobAsync( } } - private class EnvActivator(TestEnvironment env) : JobActivator + private class StatisticalTestLocalBuildJobFactory(BuildJobRunnerType trainJobRunnerType) : ILocalBuildJobFactory { - private readonly TestEnvironment _env = env; + private static readonly JsonSerializerOptions SerializerOptions = new() + { + PropertyNamingPolicy = JsonNamingPolicy.CamelCase, + }; + + public EngineType EngineType => EngineType.Statistical; - public override object ActivateJob(Type jobType) + public string? Serialize(BuildStage stage, object? data) => + new StatisticalLocalBuildJobFactory().Serialize(stage, data); + + public async Task RunAsync( + IServiceProvider serviceProvider, + string engineId, + string buildId, + BuildStage stage, + string? jobData, + string? buildOptions, + CancellationToken cancellationToken + ) { - if (jobType == typeof(WordAlignmentPreprocessBuildJob)) - { - return new WordAlignmentPreprocessBuildJob( - _env.PlatformService, - _env.Engines, - new MemoryDataAccessContext(), - Substitute.For>(), - _env.BuildJobService, - _env.SharedFileService, - Substitute.For(), - _env.BuildJobOptions - ) - { - TrainJobRunnerType = _env._trainJobRunnerType, - }; - } - if (jobType == typeof(StatisticalPostprocessBuildJob)) - { - var engineOptions = Substitute.For>(); - engineOptions.CurrentValue.Returns(new StatisticalEngineOptions()); - return new StatisticalPostprocessBuildJob( - _env.PlatformService, - _env.Engines, - new MemoryDataAccessContext(), - _env.BuildJobService, - Substitute.For>(), - _env.SharedFileService, - _env._lockFactory, - _env.WordAlignmentModelFactory, - _env.BuildJobOptions, - engineOptions - ); - } - if (jobType == typeof(StatisticalTrainBuildJob)) + switch (stage) { - return new StatisticalTrainBuildJob( - _env.PlatformService, - _env.Engines, - new MemoryDataAccessContext(), - _env.BuildJobService, - Substitute.For>(), - _env.SharedFileService, - _env.WordAlignmentModelFactory - ); + case BuildStage.Preprocess: + var preprocessJob = ActivatorUtilities.CreateInstance( + serviceProvider + ); + preprocessJob.TrainJobRunnerType = trainJobRunnerType; + var corpora = JsonSerializer.Deserialize>( + jobData!, + SerializerOptions + )!; + await preprocessJob.RunAsync(engineId, buildId, corpora, buildOptions, cancellationToken); + break; + default: + await new StatisticalLocalBuildJobFactory().RunAsync( + serviceProvider, + engineId, + buildId, + stage, + jobData, + buildOptions, + cancellationToken + ); + break; } - return base.ActivateJob(jobType); } } } diff --git a/src/Machine/test/Serval.Machine.Shared.Tests/Usings.cs b/src/Machine/test/Serval.Machine.Shared.Tests/Usings.cs index 8c24bf417..1ae4c6ce4 100644 --- a/src/Machine/test/Serval.Machine.Shared.Tests/Usings.cs +++ b/src/Machine/test/Serval.Machine.Shared.Tests/Usings.cs @@ -1,12 +1,11 @@ -global using Hangfire; -global using Hangfire.Storage; +global using System.Text.Json; +global using Microsoft.Extensions.DependencyInjection; global using Microsoft.Extensions.Hosting; global using Microsoft.Extensions.Hosting.Internal; global using Microsoft.Extensions.Logging; global using Microsoft.Extensions.Options; global using Nito.AsyncEx; global using NSubstitute; -global using NSubstitute.ClearExtensions; global using NSubstitute.ReceivedExtensions; global using NUnit.Framework; global using NUnit.Framework.Constraints; diff --git a/src/Serval/src/Serval.ApiServer/appsettings.json b/src/Serval/src/Serval.ApiServer/appsettings.json index d1fe47a12..14231ca85 100644 --- a/src/Serval/src/Serval.ApiServer/appsettings.json +++ b/src/Serval/src/Serval.ApiServer/appsettings.json @@ -30,19 +30,19 @@ "EngineType": "Nmt", "ModelType": "huggingface", "Queue": "jobs_backlog", - "DockerImage": "ghcr.io/sillsdev/machine.py:latest" + "DockerImage": "ghcr.io/sillsdev/machine.py:1.12.0" }, { "EngineType": "SmtTransfer", "ModelType": "thot", "Queue": "jobs_backlog.cpu_only", - "DockerImage": "ghcr.io/sillsdev/machine.py:latest.cpu_only" + "DockerImage": "ghcr.io/sillsdev/machine.py:1.12.0.cpu_only" }, { "EngineType": "Statistical", "ModelType": "thot", "Queue": "jobs_backlog.cpu_only", - "DockerImage": "ghcr.io/sillsdev/machine.py:latest.cpu_only" + "DockerImage": "ghcr.io/sillsdev/machine.py:1.12.0.cpu_only" } ] }, diff --git a/src/Serval/src/Serval.Shared/Serval.Shared.csproj b/src/Serval/src/Serval.Shared/Serval.Shared.csproj index 827921d4d..e6090683c 100644 --- a/src/Serval/src/Serval.Shared/Serval.Shared.csproj +++ b/src/Serval/src/Serval.Shared/Serval.Shared.csproj @@ -20,7 +20,7 @@ - + diff --git a/src/Serval/src/Serval.Translation.Contracts/ITranslationPlatformService.cs b/src/Serval/src/Serval.Translation.Contracts/ITranslationPlatformService.cs index 23f4621ff..320ebff21 100644 --- a/src/Serval/src/Serval.Translation.Contracts/ITranslationPlatformService.cs +++ b/src/Serval/src/Serval.Translation.Contracts/ITranslationPlatformService.cs @@ -22,6 +22,7 @@ Task UpdateBuildStatusAsync( CancellationToken cancellationToken = default ); Task UpdateBuildStatusAsync(string buildId, int step, CancellationToken cancellationToken = default); + Task BuildExistsAsync(string buildId, CancellationToken cancellationToken = default); Task IncrementEngineCorpusSizeAsync(string engineId, int count = 1, CancellationToken cancellationToken = default); Task InsertPretranslationsAsync( string engineId, diff --git a/src/Serval/src/Serval.Translation/Services/PlatformService.cs b/src/Serval/src/Serval.Translation/Services/PlatformService.cs index 4d27bb58b..33773ca78 100644 --- a/src/Serval/src/Serval.Translation/Services/PlatformService.cs +++ b/src/Serval/src/Serval.Translation/Services/PlatformService.cs @@ -231,6 +231,11 @@ await _pretranslations.DeleteAllAsync( ); } + public async Task BuildExistsAsync(string buildId, CancellationToken cancellationToken = default) + { + return await _builds.ExistsAsync(b => b.Id == buildId, cancellationToken); + } + public async Task UpdateBuildStatusAsync( string buildId, BuildProgressStatusContract progressStatus, diff --git a/src/Serval/src/Serval.Translation/Services/UsfmGenerationService.cs b/src/Serval/src/Serval.Translation/Services/UsfmGenerationService.cs index bb68e371b..a59746c43 100644 --- a/src/Serval/src/Serval.Translation/Services/UsfmGenerationService.cs +++ b/src/Serval/src/Serval.Translation/Services/UsfmGenerationService.cs @@ -258,13 +258,13 @@ bool isSource .Where(row => row.Refs.Any()) .OrderBy(row => row.Refs[0]) .ToArray(), - isSource ? sourceSettings?.FullName : targetSettings?.FullName, - textBehavior, - paragraphBehavior, - embedBehavior, - styleBehavior, + fullName: isSource ? sourceSettings?.FullName : targetSettings?.FullName, + textBehavior: textBehavior, + paragraphBehavior: paragraphBehavior, + embedBehavior: embedBehavior, + styleBehavior: styleBehavior, updateBlockHandlers: updateBlockHandlers, - remarks: remarks, + remarks: remarks?.Select(r => (0, r)), errorHandler: (_) => true, compareSegments: isSource ) ?? ""; @@ -398,7 +398,10 @@ private static string DenormalizeQuotationMarks(string usfm, string quoteConvent } remarks.Add(quotationDenormalizationRemark); - var updater = new UpdateUsfmParserHandler(updateBlockHandlers: [quotationMarkDenormalizer], remarks: remarks); + var updater = new UpdateUsfmParserHandler( + updateBlockHandlers: [quotationMarkDenormalizer], + remarks: remarks.Select(r => (0, r)) + ); UsfmParser.Parse(usfm, updater); usfm = updater.GetUsfm(); diff --git a/src/Serval/src/Serval.WordAlignment.Contracts/IWordAlignmentPlatformService.cs b/src/Serval/src/Serval.WordAlignment.Contracts/IWordAlignmentPlatformService.cs index 3a6b7725f..3aff4f4ab 100644 --- a/src/Serval/src/Serval.WordAlignment.Contracts/IWordAlignmentPlatformService.cs +++ b/src/Serval/src/Serval.WordAlignment.Contracts/IWordAlignmentPlatformService.cs @@ -22,6 +22,7 @@ Task UpdateBuildStatusAsync( CancellationToken cancellationToken = default ); Task UpdateBuildStatusAsync(string buildId, int step, CancellationToken cancellationToken = default); + Task BuildExistsAsync(string buildId, CancellationToken cancellationToken = default); Task IncrementEngineCorpusSizeAsync(string engineId, int count = 1, CancellationToken cancellationToken = default); Task InsertWordAlignmentsAsync( string engineId, diff --git a/src/Serval/src/Serval.WordAlignment/Services/PlatformService.cs b/src/Serval/src/Serval.WordAlignment/Services/PlatformService.cs index a67bc6a17..100aaaee5 100644 --- a/src/Serval/src/Serval.WordAlignment/Services/PlatformService.cs +++ b/src/Serval/src/Serval.WordAlignment/Services/PlatformService.cs @@ -224,6 +224,11 @@ await _wordAlignments.DeleteAllAsync( ); } + public async Task BuildExistsAsync(string buildId, CancellationToken cancellationToken = default) + { + return await _builds.ExistsAsync(b => b.Id == buildId, cancellationToken); + } + public async Task UpdateBuildStatusAsync( string buildId, BuildProgressStatusContract progressStatus, diff --git a/src/Serval/test/Serval.E2ETests/ServalApiTests.cs b/src/Serval/test/Serval.E2ETests/ServalApiTests.cs index 157b5876a..7e2fd7f7b 100644 --- a/src/Serval/test/Serval.E2ETests/ServalApiTests.cs +++ b/src/Serval/test/Serval.E2ETests/ServalApiTests.cs @@ -26,7 +26,9 @@ public async Task Echo_LegacyCorpus() string engineId = await _helperClient.CreateNewEngineAsync("Echo", "es", "es", "Echo1"); string[] books = ["1JN.txt", "2JN.txt", "3JN.txt"]; string corpusId = await _helperClient.AddLegacyCorpusToEngineAsync(engineId, books, "es", "es", true); - await _helperClient.BuildEngineAsync(engineId); + string buildId = await _helperClient.BuildEngineAsync(engineId); + TranslationBuild build = await _helperClient.TranslationEnginesClient.GetBuildAsync(engineId, buildId); + Assert.That(build.State, Is.EqualTo(JobState.Completed), JsonSerializer.Serialize(build)); // Test Pretranslation IList pretranslations = @@ -65,7 +67,9 @@ public async Task Echo_ParallelCorpus(bool paratext) corpusId = await _helperClient.AddParallelTextCorpusToEngineAsync(engineId, pretranslateCorpus, true); } - await _helperClient.BuildEngineAsync(engineId); + string buildId = await _helperClient.BuildEngineAsync(engineId); + TranslationBuild build = await _helperClient.TranslationEnginesClient.GetBuildAsync(engineId, buildId); + Assert.That(build.State, Is.EqualTo(JobState.Completed), JsonSerializer.Serialize(build)); // Test Pretranslation IList pretranslations = await _helperClient.TranslationEnginesClient.GetAllPretranslationsAsync( @@ -95,7 +99,9 @@ public async Task Echo_WordAlignment(bool paratext) await _helperClient.AddParallelTextCorpusToEngineAsync(engineId, trainCorpus, false); } - await _helperClient.BuildEngineAsync(engineId); + string buildId = await _helperClient.BuildEngineAsync(engineId); + WordAlignmentBuild build = await _helperClient.WordAlignmentEnginesClient.GetBuildAsync(engineId, buildId); + Assert.That(build.State, Is.EqualTo(JobState.Completed), JsonSerializer.Serialize(build)); WordAlignmentResult tResult = await _helperClient.WordAlignmentEnginesClient.AlignAsync( engineId, new WordAlignmentRequest { SourceSegment = "espíritu verdad", TargetSegment = "espíritu verdad" } @@ -163,6 +169,8 @@ public async Task Nmt_Batch() // Validate an NMT build using text files string buildId = await _helperClient.BuildEngineAsync(engineId); + TranslationBuild build = await _helperClient.TranslationEnginesClient.GetBuildAsync(engineId, buildId); + Assert.That(build.State, Is.EqualTo(JobState.Completed), JsonSerializer.Serialize(build)); await Task.Delay(1000); IList lTrans1 = await _helperClient.TranslationEnginesClient.GetAllPretranslationsAsync( engineId, @@ -174,7 +182,6 @@ public async Task Nmt_Batch() cId2 ); - TranslationBuild build = await _helperClient.TranslationEnginesClient.GetBuildAsync(engineId, buildId); Assert.That(build.ExecutionData, Is.Not.Null); var executionData = build.ExecutionData; @@ -199,7 +206,9 @@ public async Task Nmt_LargeBatchAndDownload() ); await _helperClient.AddParallelTextCorpusToEngineAsync(engineId, trainCorpus, false); string cId = await _helperClient.AddParallelTextCorpusToEngineAsync(engineId, pretranslateCorpus, true); - await _helperClient.BuildEngineAsync(engineId); + string buildId = await _helperClient.BuildEngineAsync(engineId); + TranslationBuild build = await _helperClient.TranslationEnginesClient.GetBuildAsync(engineId, buildId); + Assert.That(build.State, Is.EqualTo(JobState.Completed), JsonSerializer.Serialize(build)); await Task.Delay(1000); IList lTrans = await _helperClient.TranslationEnginesClient.GetAllPretranslationsAsync( engineId, @@ -280,11 +289,9 @@ public async Task Nmt_Paratext(bool withAdditionalFiles) _helperClient.TranslationBuildConfig.Options = "{\"max_steps\":50, \"use_key_terms\":true, \"parent_model_name\": \"facebook/nllb-200-distilled-600M\", \"train_params\": {\"per_device_train_batch_size\":4}, \"generate_params\":{\"num_beams\": 2}}"; - await _helperClient.BuildEngineAsync(engineId); - Assert.That( - (await _helperClient.TranslationEnginesClient.GetAllBuildsAsync(engineId)).First().State, - Is.EqualTo(JobState.Completed) - ); + string buildId = await _helperClient.BuildEngineAsync(engineId); + TranslationBuild build = await _helperClient.TranslationEnginesClient.GetBuildAsync(engineId, buildId); + Assert.That(build.State, Is.EqualTo(JobState.Completed), JsonSerializer.Serialize(build)); IList translations = await _helperClient.TranslationEnginesClient.GetAllPretranslationsAsync( engineId, @@ -442,7 +449,9 @@ public async Task Smt(bool legacyCorpus) false, legacyCorpus ); - await _helperClient.BuildEngineAsync(engineId); + string buildId = await _helperClient.BuildEngineAsync(engineId); + TranslationBuild build = await _helperClient.TranslationEnginesClient.GetBuildAsync(engineId, buildId); + Assert.That(build.State, Is.EqualTo(JobState.Completed), JsonSerializer.Serialize(build)); TranslationResult tResult = await _helperClient.TranslationEnginesClient.TranslateAsync( engineId, "verdad mundo" @@ -458,7 +467,9 @@ public async Task Smt(bool legacyCorpus) false, legacyCorpus ); - await _helperClient.BuildEngineAsync(engineId); + string buildId2 = await _helperClient.BuildEngineAsync(engineId); + TranslationBuild build2 = await _helperClient.TranslationEnginesClient.GetBuildAsync(engineId, buildId2); + Assert.That(build2.State, Is.EqualTo(JobState.Completed), JsonSerializer.Serialize(build2)); TranslationResult tResult2 = await _helperClient.TranslationEnginesClient.TranslateAsync( engineId, "verdad mundo" @@ -515,7 +526,9 @@ await _helperClient.AddTextCorpusToEngineAsync( false, legacyCorpus ); - await _helperClient.BuildEngineAsync(engineId); + string buildId3 = await _helperClient.BuildEngineAsync(engineId); + TranslationBuild build3 = await _helperClient.TranslationEnginesClient.GetBuildAsync(engineId, buildId3); + Assert.That(build3.State, Is.EqualTo(JobState.Completed), JsonSerializer.Serialize(build3)); WordGraph result = await _helperClient.TranslationEnginesClient.GetWordGraphAsync(engineId, "verdad"); Assert.That(result.SourceTokens, Has.Count.EqualTo(1)); @@ -544,6 +557,9 @@ public async Task WordAlignment() new WordAlignmentCorpusConfig() { ParallelCorpusId = corpusId }, ]; string buildId = await _helperClient.BuildEngineAsync(engineId); + WordAlignmentBuild build = await _helperClient.WordAlignmentEnginesClient.GetBuildAsync(engineId, buildId); + Assert.That(build.State, Is.EqualTo(JobState.Completed), JsonSerializer.Serialize(build)); + WordAlignmentResult tResult = await _helperClient.WordAlignmentEnginesClient.AlignAsync( engineId, new WordAlignmentRequest() { SourceSegment = "espíritu verdad", TargetSegment = "spirit truth" } @@ -568,7 +584,6 @@ public async Task WordAlignment() Assert.That(secondPair.Score, Is.EqualTo(0.9).Within(0.1)); }); - WordAlignmentBuild build = await _helperClient.WordAlignmentEnginesClient.GetBuildAsync(engineId, buildId); Assert.That(build.ExecutionData, Is.Not.Null); var executionData = build.ExecutionData;