From ba47c8f3e4ca3ac0942e93b1583bb245783fa708 Mon Sep 17 00:00:00 2001 From: Ksenia Nigmatulina Date: Wed, 27 Mar 2024 14:51:28 +0300 Subject: [PATCH 1/7] =?UTF-8?q?=D0=94=D0=BE=D0=B1=D0=B0=D0=B2=D0=BB=D0=B5?= =?UTF-8?q?=D0=BD=D0=BE=20=D1=80=D0=B5=D1=88=D0=B5=D0=BD=D0=B8=D0=B5=203?= =?UTF-8?q?=20=D0=B4=D0=BE=D0=BC=D0=B0=D1=88=D0=BD=D0=B5=D0=B9=20=D1=80?= =?UTF-8?q?=D0=B0=D0=B1=D0=BE=D1=82=D1=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Himework3.Tests/Homework3.Tests.csproj | 23 ++ Homework3.Tests/Homework3.Tests.csproj | 23 ++ Homework3.Tests/MyTreadPoolTests.cs | 181 ++++++++++++++++ Homework3.Tests/Usings.cs | 1 + Homework3/EternalTaskException.cs | 16 ++ Homework3/Homework3.csproj | 9 + Homework3/Homework3.sln | 37 ++++ Homework3/IMyTask.cs | 10 + Homework3/MyThreadPool.cs | 289 +++++++++++++++++++++++++ 9 files changed, 589 insertions(+) create mode 100644 Himework3.Tests/Homework3.Tests.csproj create mode 100644 Homework3.Tests/Homework3.Tests.csproj create mode 100644 Homework3.Tests/MyTreadPoolTests.cs create mode 100644 Homework3.Tests/Usings.cs create mode 100644 Homework3/EternalTaskException.cs create mode 100644 Homework3/Homework3.csproj create mode 100644 Homework3/Homework3.sln create mode 100644 Homework3/IMyTask.cs create mode 100644 Homework3/MyThreadPool.cs diff --git a/Himework3.Tests/Homework3.Tests.csproj b/Himework3.Tests/Homework3.Tests.csproj new file mode 100644 index 0000000..4a01125 --- /dev/null +++ b/Himework3.Tests/Homework3.Tests.csproj @@ -0,0 +1,23 @@ + + + + net7.0 + enable + enable + + false + true + + + + + + + + + + + + + + diff --git a/Homework3.Tests/Homework3.Tests.csproj b/Homework3.Tests/Homework3.Tests.csproj new file mode 100644 index 0000000..4a01125 --- /dev/null +++ b/Homework3.Tests/Homework3.Tests.csproj @@ -0,0 +1,23 @@ + + + + net7.0 + enable + enable + + false + true + + + + + + + + + + + + + + diff --git a/Homework3.Tests/MyTreadPoolTests.cs b/Homework3.Tests/MyTreadPoolTests.cs new file mode 100644 index 0000000..2b192e9 --- /dev/null +++ b/Homework3.Tests/MyTreadPoolTests.cs @@ -0,0 +1,181 @@ +namespace Homework3.Tests; + +public class Tests +{ + private MyThreadPool myThreadPool; + private ManualResetEvent manualResetEvent; + private int threadSize = 8; + + [SetUp] + public void Setup() + { + myThreadPool = new MyThreadPool(threadSize, 10000); + manualResetEvent = new ManualResetEvent(false); + } + + [Test] + public void AreAllTheThreadsWorkingTest() + { + for (var i = 0; i < threadSize; ++i) + { + myThreadPool.AddTask(() => + { + manualResetEvent.WaitOne(); + return 0; + }); + } + Thread.Sleep(1000); + Assert.That(myThreadPool.WorkingThreads, Is.EqualTo(threadSize)); + manualResetEvent.Set(); + Thread.Sleep(100); + Assert.That(myThreadPool.WorkingThreads, Is.EqualTo(0)); + } + + [Test] + public void FullLoadTest() + { + var tasks = new IMyTask[threadSize * 5]; + var correctResults = new int[threadSize * 5]; + for (var i = 0; i < tasks.Length; ++i) + { + var localI = i; + tasks[i] = myThreadPool.AddTask(() => + { + return localI * localI; + }); + correctResults[i] = i * i; + } + Thread.Sleep(1000); + for (var i = 0; i < tasks.Length; ++i) + { + Assert.That(correctResults[i], Is.EqualTo(tasks[i].Result)); + } + } + + [Test] + public void ContinuationsTest() + { + var tasks = new IMyTask[threadSize]; + var continuations = new IMyTask[threadSize]; + var correctResults = new int[threadSize]; + for (var i = 0; i < 1; ++i) + { + var localI = i; + tasks[i] = myThreadPool.AddTask(() => + { + return localI; + }); + continuations[i] = tasks[i].ContinueWith(x => x + 1); + correctResults[i] = i + 1; + } + Thread.Sleep(100); + for (var i = 0; i < 1; ++i) + { + Assert.That(correctResults[i], Is.EqualTo(continuations[i].Result)); + } + } + + [Test] + public void ShutDownTest() + { + var tasks = new IMyTask[threadSize]; + var iterations = 100; + for (var i = 0; i < tasks.Length; ++i) + { + tasks[i] = myThreadPool.AddTask(() => + { + var sum = 0; + for (var i = 0; i < iterations; ++i) + { + ++sum; + } + return 0; + }); + } + myThreadPool.ShutDown(); + Assert.That(myThreadPool.IsTerminated, Is.EqualTo(true)); + Assert.Throws(() => myThreadPool + .AddTask(() => 0)); + } + + [Test] + public void ResultTest() + { + + var task = myThreadPool.AddTask(() => + { + var sum = 0; + for (var i = 0; i < 100; ++i) + { + sum += i; + } + return sum; + }); + Assert.That(task.Result, Is.EqualTo(4950)); + } + + [Test] + public void IsCompletedTest() + { + var task = myThreadPool.AddTask(() => + { + manualResetEvent.WaitOne(); + return 0; + }); + Assert.That(task.IsCompleted, Is.EqualTo(false)); + manualResetEvent.Set(); + Thread.Sleep(1000); + Assert.That(task.IsCompleted, Is.EqualTo(true)); + } + + [Test] + public void ConcurrentAccessTest() + { + var myTask = myThreadPool.AddTask(() => 0); + Thread.Sleep(100); + var taskArray = new IMyTask[threadSize * 2]; + for (var i = 0; i < threadSize * 2; ++i) + { + var localI = i; + taskArray[i] = myTask.ContinueWith((x) => + { + manualResetEvent.WaitOne(); + Thread.Sleep(100); + var result = x + localI; + return result; + }); + } + + Thread.Sleep(100); + manualResetEvent.Set(); + myThreadPool.ShutDown(); + for (var j = 0; j < threadSize + 2; ++j) + { + if (j < threadSize) + { + Assert.That(taskArray[j].Result, Is.EqualTo(j)); + } + else + { + Assert.Throws(() => + { var result = taskArray[j].Result; }); + } + } + } + + [Test] + public void MultipleContinuationTest() + { + + var task = myThreadPool.AddTask(() => + { + var i = 1; + for (var j = 0; j < 10; ++j) + { + i *= 2; + } + return i; + }).ContinueWith(i => i / 2).ContinueWith(i => i.ToString()); + Assert.That(task.Result, Is.EqualTo("512")); + } +} diff --git a/Homework3.Tests/Usings.cs b/Homework3.Tests/Usings.cs new file mode 100644 index 0000000..9a28bd8 --- /dev/null +++ b/Homework3.Tests/Usings.cs @@ -0,0 +1 @@ +global using NUnit.Framework; diff --git a/Homework3/EternalTaskException.cs b/Homework3/EternalTaskException.cs new file mode 100644 index 0000000..7c1c315 --- /dev/null +++ b/Homework3/EternalTaskException.cs @@ -0,0 +1,16 @@ + +/// +/// Thrown if the task hasn't been completed for some exact time after Shutdown. +/// +public class EternalTaskException : Exception +{ + /// + /// Initializes a new instance of the class. + /// + /// Error message. + public EternalTaskException(string message) + : base(message) + { + } +} + diff --git a/Homework3/Homework3.csproj b/Homework3/Homework3.csproj new file mode 100644 index 0000000..4658cbf --- /dev/null +++ b/Homework3/Homework3.csproj @@ -0,0 +1,9 @@ + + + + net7.0 + enable + enable + + + diff --git a/Homework3/Homework3.sln b/Homework3/Homework3.sln new file mode 100644 index 0000000..10fff3d --- /dev/null +++ b/Homework3/Homework3.sln @@ -0,0 +1,37 @@ + +Microsoft Visual Studio Solution File, Format Version 12.00 +# Visual Studio Version 16 +VisualStudioVersion = 25.0.1706.3 +MinimumVisualStudioVersion = 10.0.40219.1 +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Homework3", "Homework3.csproj", "{6CD95AEA-46EF-4C7C-B3E3-C4B948912D59}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Homework3.Tests", "..\Himework3.Tests\Homework3.Tests.csproj", "{D38B90EC-639E-4C1F-83C0-4AC0EE8FB1FD}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Homework3.Tests", "..\Homework3.Tests\Homework3.Tests.csproj", "{4A061EE5-D780-4E8B-93E9-B6BE23A122F1}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Release|Any CPU = Release|Any CPU + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {6CD95AEA-46EF-4C7C-B3E3-C4B948912D59}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {6CD95AEA-46EF-4C7C-B3E3-C4B948912D59}.Debug|Any CPU.Build.0 = Debug|Any CPU + {6CD95AEA-46EF-4C7C-B3E3-C4B948912D59}.Release|Any CPU.ActiveCfg = Release|Any CPU + {6CD95AEA-46EF-4C7C-B3E3-C4B948912D59}.Release|Any CPU.Build.0 = Release|Any CPU + {D38B90EC-639E-4C1F-83C0-4AC0EE8FB1FD}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {D38B90EC-639E-4C1F-83C0-4AC0EE8FB1FD}.Debug|Any CPU.Build.0 = Debug|Any CPU + {D38B90EC-639E-4C1F-83C0-4AC0EE8FB1FD}.Release|Any CPU.ActiveCfg = Release|Any CPU + {D38B90EC-639E-4C1F-83C0-4AC0EE8FB1FD}.Release|Any CPU.Build.0 = Release|Any CPU + {4A061EE5-D780-4E8B-93E9-B6BE23A122F1}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {4A061EE5-D780-4E8B-93E9-B6BE23A122F1}.Debug|Any CPU.Build.0 = Debug|Any CPU + {4A061EE5-D780-4E8B-93E9-B6BE23A122F1}.Release|Any CPU.ActiveCfg = Release|Any CPU + {4A061EE5-D780-4E8B-93E9-B6BE23A122F1}.Release|Any CPU.Build.0 = Release|Any CPU + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection + GlobalSection(ExtensibilityGlobals) = postSolution + SolutionGuid = {44148E15-051F-4366-A340-D677BCF6B98B} + EndGlobalSection +EndGlobal diff --git a/Homework3/IMyTask.cs b/Homework3/IMyTask.cs new file mode 100644 index 0000000..3ce7e31 --- /dev/null +++ b/Homework3/IMyTask.cs @@ -0,0 +1,10 @@ + +public interface IMyTask +{ + public bool IsCompleted { get; } + + public T1 Result { get; } + + public IMyTask ContinueWith(Func func); +} + diff --git a/Homework3/MyThreadPool.cs b/Homework3/MyThreadPool.cs new file mode 100644 index 0000000..15b7487 --- /dev/null +++ b/Homework3/MyThreadPool.cs @@ -0,0 +1,289 @@ +namespace Homework3; + +using System; +using System.Collections.Concurrent; +using System.Threading; + +/// +/// Class which enables to perform several tasks parallely. +/// +public class MyThreadPool +{ + private int maxTimeForCompleteJointPerThread; + private Thread[] threads; + private CancellationTokenSource cancellationTokenSource; + private AutoResetEvent newTaskIsAwaiting; + private ManualResetEvent areAnyTasksInQueue; + private WaitHandle[] waitHandlers; + private object synchronizationObject; + private int workingThreads; + private volatile bool[] isWorking; + + /// + /// Instantiates a new instance of MyThreadPool class. + /// + public MyThreadPool(int numberOfThreads, int maxTimeForCompleteJointPerThread = 1000) + { + if (numberOfThreads <= 0) + { + throw new InvalidDataException(); + } + + isWorking = new bool[numberOfThreads]; + Tasks = new ConcurrentQueue(); + threads = new Thread[numberOfThreads]; + cancellationTokenSource = new CancellationTokenSource(); + newTaskIsAwaiting = new AutoResetEvent(false); + areAnyTasksInQueue = new ManualResetEvent(false); + waitHandlers = new WaitHandle[2]{newTaskIsAwaiting, + areAnyTasksInQueue}; + this.maxTimeForCompleteJointPerThread = maxTimeForCompleteJointPerThread; + synchronizationObject = new object(); + Start(); + } + + public ConcurrentQueue Tasks { get; private set; } + + /// + /// Returns a number of threads, which calculates task at the current moment. + /// + public int WorkingThreads { get => workingThreads; } + + /// + /// Indicates if the thread pool is active. + /// + public bool IsTerminated { get; private set; } + + private void Start() + { + for (var i = 0; i < threads.Length; ++i) + { + var localI = i; + threads[i] = new Thread(() => + { + while (!cancellationTokenSource.IsCancellationRequested) + { + WaitHandle.WaitAny(waitHandlers); + if (cancellationTokenSource.IsCancellationRequested) + { + break; + } + + lock (Tasks) + { + if (Tasks.Count > 0) + { + areAnyTasksInQueue.Set(); + } + else + { + areAnyTasksInQueue.Reset(); + } + } + + if (!Tasks.IsEmpty) + { + var isAvailable = Tasks.TryDequeue(out var action); + if (isAvailable && action != null) + { + Interlocked.Increment(ref workingThreads); + isWorking[localI] = true; + action.Invoke(); + isWorking[localI] = false; + Interlocked.Decrement(ref workingThreads); + action = null; + } + } + + lock (Tasks) + { + if (Tasks.Count > 0) + { + areAnyTasksInQueue.Set(); + } + else + { + areAnyTasksInQueue.Reset(); + } + } + } + }); + threads[i].Start(); + } + } + + /// + /// Add new task item to the queue of tasks. + /// + /// + /// + /// + /// + /// + public IMyTask AddTask(Func function, ManualResetEvent? isUpperTaskCompleted = null) + { + lock (synchronizationObject) + { + if (cancellationTokenSource.IsCancellationRequested) + { + throw new InvalidOperationException("Shutdown has been requested"); + } + + + var myTask = isUpperTaskCompleted == null ? new MyTask(function, this) + : new MyTask(function, this, isUpperTaskCompleted); + lock (Tasks) + { + Tasks.Enqueue(myTask.Performe); + } + + newTaskIsAwaiting.Set(); + return myTask; + } + } + + /// + /// Terminates the work of thread pool and makes threads to performe already started tasks. + /// + /// + public void ShutDown() + { + lock (synchronizationObject) + { + cancellationTokenSource.Cancel(); + areAnyTasksInQueue.Set(); + for (var i = 0; i < this.threads.Length; ++i) + { + threads[i].Join(maxTimeForCompleteJointPerThread); + if (isWorking[i]) + { + throw new EternalTaskException("The task hasn't been finished" + + $" for {this.maxTimeForCompleteJointPerThread} milliseconds"); + } + } + + IsTerminated = true; + } + } + + private class MyTask : IMyTask + { + private Func function; + private T1? result; + private bool isResultReady; + private MyThreadPool threadPool; + + private List continuations; + + private ManualResetEvent accessToResult; + private ManualResetEvent? isUpperTaskCompleted; + private ManualResetEvent manualResetEventForContinuations; + + private Exception? exception; + private MyThreadPool myThreadPool; + + /// + /// Initializes a new instance of the class. + /// + /// Task function. + /// ThreadPool that will execute this task. + /// Parental task ManualResetEvent (for continuation task). + public MyTask(Func function, MyThreadPool myThreadPool, + ManualResetEvent? manualResetEventForContinuations = null) + { + this.function = function; + continuations = new List(); + accessToResult = new ManualResetEvent(false); + this.myThreadPool = myThreadPool; + isUpperTaskCompleted = manualResetEventForContinuations; + this.manualResetEventForContinuations = new ManualResetEvent(false); + threadPool = myThreadPool; + } + + /// + /// Returns true value if task has been already calculated. + /// + public bool IsCompleted => isResultReady; + + /// + /// Returns the task result. + /// + public T1 Result + { + get + { + if (myThreadPool.IsTerminated && !isResultReady) + { + throw new InvalidOperationException("Hasn't been started when the Shutdown was requested."); + } + + accessToResult.WaitOne(); + if (exception != null) + { + throw new AggregateException(exception); + } + + return result!; + } + } + + /// + /// Creates a new task which operates with the result of this task. + /// + /// Value type for the continuation result. + /// Function for creating a new task. + /// Task with new return value type of the function. + public IMyTask ContinueWith(Func func) + { + lock (threadPool.Tasks) + { + if (result != null) + { + return myThreadPool.AddTask(() => func(Result), manualResetEventForContinuations); + } + + var continuation = new MyTask( + () => func(Result), + myThreadPool, + manualResetEventForContinuations); + continuations.Add(continuation.Performe); + return continuation; + } + } + + /// + /// Calculates task. + /// + public void Performe() + { + try + { + if (isUpperTaskCompleted != null) + { + isUpperTaskCompleted!.WaitOne(); + } + + result = function(); + + lock (threadPool.Tasks) + { + if (continuations.Count > 0) + { + foreach (var continuation in continuations) + { + myThreadPool.AddTask(() => continuation, isUpperTaskCompleted); + } + } + } + } + catch (Exception ex) + { + exception = ex; + } + + isResultReady = true; + accessToResult.Set(); + manualResetEventForContinuations.Set(); + } + } +} From a020537627e1e60db168824bb8b05ea2e62a4360 Mon Sep 17 00:00:00 2001 From: Ksenia Nigmatulina Date: Wed, 27 Mar 2024 15:04:08 +0300 Subject: [PATCH 2/7] =?UTF-8?q?=D0=A3=D0=B4=D0=B0=D0=BB=D0=B8=D0=BB=D0=B0?= =?UTF-8?q?=20=D0=BB=D0=B8=D1=88=D0=BD=D0=B8=D0=B9=20=D1=84=D0=B0=D0=B9?= =?UTF-8?q?=D0=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Himework3.Tests/Homework3.Tests.csproj | 23 ----------------------- 1 file changed, 23 deletions(-) delete mode 100644 Himework3.Tests/Homework3.Tests.csproj diff --git a/Himework3.Tests/Homework3.Tests.csproj b/Himework3.Tests/Homework3.Tests.csproj deleted file mode 100644 index 4a01125..0000000 --- a/Himework3.Tests/Homework3.Tests.csproj +++ /dev/null @@ -1,23 +0,0 @@ - - - - net7.0 - enable - enable - - false - true - - - - - - - - - - - - - - From 66cc5da16f091cd1b2346da188173191901f23f6 Mon Sep 17 00:00:00 2001 From: Ksenia Nigmatulina Date: Wed, 3 Apr 2024 01:58:37 +0300 Subject: [PATCH 3/7] =?UTF-8?q?=D0=92=D1=81=D0=B5=20=D0=BF=D0=B5=D1=80?= =?UTF-8?q?=D0=B5=D0=B4=D0=B5=D0=BB=D0=B0=D0=BB=D0=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Homework3.Tests/MyTreadPoolTests.cs | 185 +++++----------- Homework3/Homework3.sln | 16 +- Homework3/IMyTask.cs | 28 ++- Homework3/MyThreadPool.cs | 324 +++++++++------------------- 4 files changed, 179 insertions(+), 374 deletions(-) diff --git a/Homework3.Tests/MyTreadPoolTests.cs b/Homework3.Tests/MyTreadPoolTests.cs index 2b192e9..72fc293 100644 --- a/Homework3.Tests/MyTreadPoolTests.cs +++ b/Homework3.Tests/MyTreadPoolTests.cs @@ -1,181 +1,96 @@ -namespace Homework3.Tests; +using static Homework3.MyThreadPool; -public class Tests +namespace Homework3.Tests; + +class Tests { - private MyThreadPool myThreadPool; - private ManualResetEvent manualResetEvent; - private int threadSize = 8; + readonly int amountOfThreads = 5; + MyThreadPool threadPool; [SetUp] - public void Setup() + public void SetUp() { - myThreadPool = new MyThreadPool(threadSize, 10000); - manualResetEvent = new ManualResetEvent(false); + threadPool = new(amountOfThreads); } [Test] - public void AreAllTheThreadsWorkingTest() + public void ShutdownTest() { - for (var i = 0; i < threadSize; ++i) - { - myThreadPool.AddTask(() => - { - manualResetEvent.WaitOne(); - return 0; - }); - } + var myTask1 = threadPool.Submit(() => 2 * 2)!.ContinueWith(x => x.ToString()); Thread.Sleep(1000); - Assert.That(myThreadPool.WorkingThreads, Is.EqualTo(threadSize)); - manualResetEvent.Set(); + threadPool.Shutdown(); Thread.Sleep(100); - Assert.That(myThreadPool.WorkingThreads, Is.EqualTo(0)); + Assert.Throws(() => threadPool.Submit(() => 2 * 2)!.ContinueWith(x => x.ToString())); } [Test] - public void FullLoadTest() + public void SubmitTest() { - var tasks = new IMyTask[threadSize * 5]; - var correctResults = new int[threadSize * 5]; - for (var i = 0; i < tasks.Length; ++i) - { - var localI = i; - tasks[i] = myThreadPool.AddTask(() => - { - return localI * localI; - }); - correctResults[i] = i * i; - } + var myTask = threadPool.Submit(() => 2 * 2)!; Thread.Sleep(1000); - for (var i = 0; i < tasks.Length; ++i) + Assert.Multiple(() => { - Assert.That(correctResults[i], Is.EqualTo(tasks[i].Result)); - } - } - - [Test] - public void ContinuationsTest() - { - var tasks = new IMyTask[threadSize]; - var continuations = new IMyTask[threadSize]; - var correctResults = new int[threadSize]; - for (var i = 0; i < 1; ++i) - { - var localI = i; - tasks[i] = myThreadPool.AddTask(() => - { - return localI; - }); - continuations[i] = tasks[i].ContinueWith(x => x + 1); - correctResults[i] = i + 1; - } - Thread.Sleep(100); - for (var i = 0; i < 1; ++i) - { - Assert.That(correctResults[i], Is.EqualTo(continuations[i].Result)); - } + Assert.That(myTask.IsCompleted(), Is.EqualTo(true)); + Assert.That(myTask.Result(), Is.EqualTo(4)); + }); } [Test] - public void ShutDownTest() + public void SubmitPlusContinueWithTest() { - var tasks = new IMyTask[threadSize]; - var iterations = 100; - for (var i = 0; i < tasks.Length; ++i) + var myTask = threadPool.Submit(() => 2 * 2)!.ContinueWith(x => x.ToString()); + Thread.Sleep(1000); + Assert.Multiple(() => { - tasks[i] = myThreadPool.AddTask(() => - { - var sum = 0; - for (var i = 0; i < iterations; ++i) - { - ++sum; - } - return 0; - }); - } - myThreadPool.ShutDown(); - Assert.That(myThreadPool.IsTerminated, Is.EqualTo(true)); - Assert.Throws(() => myThreadPool - .AddTask(() => 0)); + Assert.That(myTask.IsCompleted(), Is.EqualTo(true)); + Assert.That(myTask.Result(), Is.EqualTo("4")); + }); } [Test] - public void ResultTest() + public void ExceptionTest() { - - var task = myThreadPool.AddTask(() => + var myTask = threadPool.Submit(() => { - var sum = 0; - for (var i = 0; i < 100; ++i) - { - sum += i; - } - return sum; + throw new InvalidOperationException(); + return 4; //without this string test doesn't work }); - Assert.That(task.Result, Is.EqualTo(4950)); + Assert.Throws(() => myTask!.Result()); } [Test] - public void IsCompletedTest() + public void ThreeContinueWithTest() { - var task = myThreadPool.AddTask(() => + var myTask = threadPool.Submit(() => 2 * 2)!.ContinueWith(x => x.ToString()).ContinueWith(y => y + " "); + Thread.Sleep(1000); + Assert.Multiple(() => { - manualResetEvent.WaitOne(); - return 0; + Assert.That(myTask.IsCompleted(), Is.EqualTo(true)); + Assert.That(myTask.Result(), Is.EqualTo("4 ")); }); - Assert.That(task.IsCompleted, Is.EqualTo(false)); - manualResetEvent.Set(); - Thread.Sleep(1000); - Assert.That(task.IsCompleted, Is.EqualTo(true)); } [Test] - public void ConcurrentAccessTest() + public void ThereNThreadsInMyThreadPoolTest() { - var myTask = myThreadPool.AddTask(() => 0); - Thread.Sleep(100); - var taskArray = new IMyTask[threadSize * 2]; - for (var i = 0; i < threadSize * 2; ++i) + var threadsId = new int[amountOfThreads]; + for (int i = 0; i < amountOfThreads; i++) { - var localI = i; - taskArray[i] = myTask.ContinueWith((x) => + int localI = i; + threadPool.Submit(() => { - manualResetEvent.WaitOne(); - Thread.Sleep(100); - var result = x + localI; - return result; + threadsId[localI] = Environment.CurrentManagedThreadId; + Thread.Sleep(1000); + return 2 * 2; }); } - - Thread.Sleep(100); - manualResetEvent.Set(); - myThreadPool.ShutDown(); - for (var j = 0; j < threadSize + 2; ++j) + Dictionary dict = new(); + for (int i = 0; i < amountOfThreads; i++) { - if (j < threadSize) - { - Assert.That(taskArray[j].Result, Is.EqualTo(j)); - } - else - { - Assert.Throws(() => - { var result = taskArray[j].Result; }); - } + dict.Add(i, threadsId[i]); } - } + var values = dict.Select(x => x.Value).ToArray(); - [Test] - public void MultipleContinuationTest() - { - - var task = myThreadPool.AddTask(() => - { - var i = 1; - for (var j = 0; j < 10; ++j) - { - i *= 2; - } - return i; - }).ContinueWith(i => i / 2).ContinueWith(i => i.ToString()); - Assert.That(task.Result, Is.EqualTo("512")); + Assert.That(values.Count, Is.EqualTo(amountOfThreads)); } -} +} \ No newline at end of file diff --git a/Homework3/Homework3.sln b/Homework3/Homework3.sln index 10fff3d..72dec81 100644 --- a/Homework3/Homework3.sln +++ b/Homework3/Homework3.sln @@ -5,9 +5,7 @@ VisualStudioVersion = 25.0.1706.3 MinimumVisualStudioVersion = 10.0.40219.1 Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Homework3", "Homework3.csproj", "{6CD95AEA-46EF-4C7C-B3E3-C4B948912D59}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Homework3.Tests", "..\Himework3.Tests\Homework3.Tests.csproj", "{D38B90EC-639E-4C1F-83C0-4AC0EE8FB1FD}" -EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Homework3.Tests", "..\Homework3.Tests\Homework3.Tests.csproj", "{4A061EE5-D780-4E8B-93E9-B6BE23A122F1}" +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Homework3.Tests", "..\Homework3.Tests\Homework3.Tests.csproj", "{19B0F252-AE39-4460-A8C3-ACD58D24B58B}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -19,14 +17,10 @@ Global {6CD95AEA-46EF-4C7C-B3E3-C4B948912D59}.Debug|Any CPU.Build.0 = Debug|Any CPU {6CD95AEA-46EF-4C7C-B3E3-C4B948912D59}.Release|Any CPU.ActiveCfg = Release|Any CPU {6CD95AEA-46EF-4C7C-B3E3-C4B948912D59}.Release|Any CPU.Build.0 = Release|Any CPU - {D38B90EC-639E-4C1F-83C0-4AC0EE8FB1FD}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {D38B90EC-639E-4C1F-83C0-4AC0EE8FB1FD}.Debug|Any CPU.Build.0 = Debug|Any CPU - {D38B90EC-639E-4C1F-83C0-4AC0EE8FB1FD}.Release|Any CPU.ActiveCfg = Release|Any CPU - {D38B90EC-639E-4C1F-83C0-4AC0EE8FB1FD}.Release|Any CPU.Build.0 = Release|Any CPU - {4A061EE5-D780-4E8B-93E9-B6BE23A122F1}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {4A061EE5-D780-4E8B-93E9-B6BE23A122F1}.Debug|Any CPU.Build.0 = Debug|Any CPU - {4A061EE5-D780-4E8B-93E9-B6BE23A122F1}.Release|Any CPU.ActiveCfg = Release|Any CPU - {4A061EE5-D780-4E8B-93E9-B6BE23A122F1}.Release|Any CPU.Build.0 = Release|Any CPU + {19B0F252-AE39-4460-A8C3-ACD58D24B58B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {19B0F252-AE39-4460-A8C3-ACD58D24B58B}.Debug|Any CPU.Build.0 = Debug|Any CPU + {19B0F252-AE39-4460-A8C3-ACD58D24B58B}.Release|Any CPU.ActiveCfg = Release|Any CPU + {19B0F252-AE39-4460-A8C3-ACD58D24B58B}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/Homework3/IMyTask.cs b/Homework3/IMyTask.cs index 3ce7e31..d0350ae 100644 --- a/Homework3/IMyTask.cs +++ b/Homework3/IMyTask.cs @@ -1,10 +1,24 @@ - -public interface IMyTask -{ - public bool IsCompleted { get; } +using static Homework3.MyThreadPool; - public T1 Result { get; } +public interface IMyTask +{ + /// + /// Returns true, if task is completed. + /// + /// + public bool IsCompleted(); - public IMyTask ContinueWith(Func func); -} + /// + /// Return result of the task. + /// + /// + public T Result(); + /// + /// Return new task which operates with the previous result. + /// + /// + /// + /// + public MyTask ContinueWith(Func continueFunction); +} \ No newline at end of file diff --git a/Homework3/MyThreadPool.cs b/Homework3/MyThreadPool.cs index 15b7487..698641c 100644 --- a/Homework3/MyThreadPool.cs +++ b/Homework3/MyThreadPool.cs @@ -2,288 +2,170 @@ using System; using System.Collections.Concurrent; -using System.Threading; -/// -/// Class which enables to perform several tasks parallely. -/// public class MyThreadPool { - private int maxTimeForCompleteJointPerThread; - private Thread[] threads; - private CancellationTokenSource cancellationTokenSource; - private AutoResetEvent newTaskIsAwaiting; - private ManualResetEvent areAnyTasksInQueue; - private WaitHandle[] waitHandlers; - private object synchronizationObject; - private int workingThreads; - private volatile bool[] isWorking; + private readonly AutoResetEvent threadHandler; + private readonly ConcurrentQueue taskQueque; + private readonly CancellationTokenSource cancellationTokenSource; + private int amountOfCompletedTasks = 0; + private int amountOfWorkingThreads = 0; /// - /// Instantiates a new instance of MyThreadPool class. + /// Creates new instance of MyThreadPool class. /// - public MyThreadPool(int numberOfThreads, int maxTimeForCompleteJointPerThread = 1000) + /// + public MyThreadPool(int amountOfThreads) { - if (numberOfThreads <= 0) - { - throw new InvalidDataException(); - } - - isWorking = new bool[numberOfThreads]; - Tasks = new ConcurrentQueue(); - threads = new Thread[numberOfThreads]; - cancellationTokenSource = new CancellationTokenSource(); - newTaskIsAwaiting = new AutoResetEvent(false); - areAnyTasksInQueue = new ManualResetEvent(false); - waitHandlers = new WaitHandle[2]{newTaskIsAwaiting, - areAnyTasksInQueue}; - this.maxTimeForCompleteJointPerThread = maxTimeForCompleteJointPerThread; - synchronizationObject = new object(); - Start(); - } - - public ConcurrentQueue Tasks { get; private set; } + threadHandler = new(false); + taskQueque = new(); - /// - /// Returns a number of threads, which calculates task at the current moment. - /// - public int WorkingThreads { get => workingThreads; } - - /// - /// Indicates if the thread pool is active. - /// - public bool IsTerminated { get; private set; } + cancellationTokenSource = new(); + var threads = new Thread[amountOfThreads]; - private void Start() - { - for (var i = 0; i < threads.Length; ++i) + for (int i = 0; i < amountOfThreads; i++) { - var localI = i; + int localI = i; threads[i] = new Thread(() => { - while (!cancellationTokenSource.IsCancellationRequested) - { - WaitHandle.WaitAny(waitHandlers); - if (cancellationTokenSource.IsCancellationRequested) - { - break; - } - - lock (Tasks) - { - if (Tasks.Count > 0) - { - areAnyTasksInQueue.Set(); - } - else - { - areAnyTasksInQueue.Reset(); - } - } - - if (!Tasks.IsEmpty) - { - var isAvailable = Tasks.TryDequeue(out var action); - if (isAvailable && action != null) - { - Interlocked.Increment(ref workingThreads); - isWorking[localI] = true; - action.Invoke(); - isWorking[localI] = false; - Interlocked.Decrement(ref workingThreads); - action = null; - } - } + TasksPerforming(); + } + ); + } - lock (Tasks) - { - if (Tasks.Count > 0) - { - areAnyTasksInQueue.Set(); - } - else - { - areAnyTasksInQueue.Reset(); - } - } - } - }); - threads[i].Start(); + foreach (var thread in threads) + { + thread.Start(); } + } /// - /// Add new task item to the queue of tasks. + /// Returns new task if cancellationToken wasn't requested or throws exception /// - /// + /// /// - /// /// /// - public IMyTask AddTask(Func function, ManualResetEvent? isUpperTaskCompleted = null) + public MyTask? Submit(Func function) { - lock (synchronizationObject) + if (!cancellationTokenSource.IsCancellationRequested) { - if (cancellationTokenSource.IsCancellationRequested) - { - throw new InvalidOperationException("Shutdown has been requested"); - } - - - var myTask = isUpperTaskCompleted == null ? new MyTask(function, this) - : new MyTask(function, this, isUpperTaskCompleted); - lock (Tasks) - { - Tasks.Enqueue(myTask.Performe); - } - - newTaskIsAwaiting.Set(); - return myTask; + return new MyTask(this, function); } + throw new InvalidOperationException("Shutdown was requested, threadpool stoped calculating"); } /// - /// Terminates the work of thread pool and makes threads to performe already started tasks. + /// Terminates the threads: already running tasks are not interrupted, but new tasks are not accepted for execution by threads from the pool. /// - /// - public void ShutDown() + public void Shutdown() { - lock (synchronizationObject) + cancellationTokenSource.Cancel(); + while (amountOfWorkingThreads != 0) { - cancellationTokenSource.Cancel(); - areAnyTasksInQueue.Set(); - for (var i = 0; i < this.threads.Length; ++i) - { - threads[i].Join(maxTimeForCompleteJointPerThread); - if (isWorking[i]) - { - throw new EternalTaskException("The task hasn't been finished" + - $" for {this.maxTimeForCompleteJointPerThread} milliseconds"); - } - } - - IsTerminated = true; + continue; } } - private class MyTask : IMyTask + private void TasksPerforming() { - private Func function; - private T1? result; - private bool isResultReady; - private MyThreadPool threadPool; - - private List continuations; - private ManualResetEvent accessToResult; - private ManualResetEvent? isUpperTaskCompleted; - private ManualResetEvent manualResetEventForContinuations; - - private Exception? exception; - private MyThreadPool myThreadPool; - - /// - /// Initializes a new instance of the class. - /// - /// Task function. - /// ThreadPool that will execute this task. - /// Parental task ManualResetEvent (for continuation task). - public MyTask(Func function, MyThreadPool myThreadPool, - ManualResetEvent? manualResetEventForContinuations = null) + if (!taskQueque.IsEmpty) { - this.function = function; - continuations = new List(); - accessToResult = new ManualResetEvent(false); - this.myThreadPool = myThreadPool; - isUpperTaskCompleted = manualResetEventForContinuations; - this.manualResetEventForContinuations = new ManualResetEvent(false); - threadPool = myThreadPool; + var isSuccessullyRemoved = taskQueque.TryDequeue(out Action? newTask); + if (isSuccessullyRemoved) + { + threadHandler.Set(); + Interlocked.Add(ref amountOfWorkingThreads, 1); + newTask!.Invoke(); + Interlocked.Add(ref amountOfCompletedTasks, 1); + Interlocked.Add(ref amountOfWorkingThreads, -1); + threadHandler.WaitOne(); + } } + } - /// - /// Returns true value if task has been already calculated. - /// - public bool IsCompleted => isResultReady; + /// + /// Task for MyThreadPoolClass + /// + /// + public class MyTask : IMyTask + { + private T result; + private MyThreadPool myThreadPool; + private Exception? exception = null; + private AutoResetEvent myTaskThreadHandler = new(true); + private volatile bool isResultReady = false; /// - /// Returns the task result. + /// Creates new instance of MyTask class. /// - public T1 Result + /// + /// + public MyTask(MyThreadPool myThreadPool, Func function) { - get + this.myThreadPool = myThreadPool; + this.myThreadPool.taskQueque.Enqueue(new Action(() => { - if (myThreadPool.IsTerminated && !isResultReady) + try { - throw new InvalidOperationException("Hasn't been started when the Shutdown was requested."); + result = function.Invoke(); } - - accessToResult.WaitOne(); - if (exception != null) + catch (Exception e) { - throw new AggregateException(exception); + exception = e; } - - return result!; - } + finally + { + isResultReady = true; //volatile: writes all values in memory + myTaskThreadHandler.Set(); + } + })); + myThreadPool.TasksPerforming(); } /// - /// Creates a new task which operates with the result of this task. + /// Returns true, if task is completed. /// - /// Value type for the continuation result. - /// Function for creating a new task. - /// Task with new return value type of the function. - public IMyTask ContinueWith(Func func) + /// + public bool IsCompleted() { - lock (threadPool.Tasks) - { - if (result != null) - { - return myThreadPool.AddTask(() => func(Result), manualResetEventForContinuations); - } - - var continuation = new MyTask( - () => func(Result), - myThreadPool, - manualResetEventForContinuations); - continuations.Add(continuation.Performe); - return continuation; - } + return isResultReady; } /// - /// Calculates task. + /// Return result of the task. /// - public void Performe() + /// + public T Result() { - try + if (!isResultReady) { - if (isUpperTaskCompleted != null) - { - isUpperTaskCompleted!.WaitOne(); - } - - result = function(); - - lock (threadPool.Tasks) - { - if (continuations.Count > 0) - { - foreach (var continuation in continuations) - { - myThreadPool.AddTask(() => continuation, isUpperTaskCompleted); - } - } - } + myTaskThreadHandler.Reset(); } - catch (Exception ex) + if (Volatile.Read(ref exception) == null) { - exception = ex; + return result; } + throw new AggregateException("", exception!); + } - isResultReady = true; - accessToResult.Set(); - manualResetEventForContinuations.Set(); + /// + /// Return new task which operates with the previous result. + /// + /// + /// + /// + public MyTask ContinueWith(Func continueFunction) + { + if (!myThreadPool.cancellationTokenSource.IsCancellationRequested) + { + var nextFunction = new Func(() => continueFunction(Result())); + return new MyTask(myThreadPool, nextFunction); + } + throw new InvalidOperationException("Shutdown was requested, threadpool stoped calculating"); } } -} + +} \ No newline at end of file From 1a2d04d5150e67b61396bf22d050c644bda5a819 Mon Sep 17 00:00:00 2001 From: Ksenia Nigmatulina Date: Fri, 5 Apr 2024 15:52:48 +0300 Subject: [PATCH 4/7] auto -> manual --- Homework3.Tests/MyTreadPoolTests.cs | 3 +-- Homework3/MyThreadPool.cs | 4 ++-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/Homework3.Tests/MyTreadPoolTests.cs b/Homework3.Tests/MyTreadPoolTests.cs index 72fc293..ea54030 100644 --- a/Homework3.Tests/MyTreadPoolTests.cs +++ b/Homework3.Tests/MyTreadPoolTests.cs @@ -1,5 +1,4 @@ -using static Homework3.MyThreadPool; - + namespace Homework3.Tests; class Tests diff --git a/Homework3/MyThreadPool.cs b/Homework3/MyThreadPool.cs index 698641c..3e6a167 100644 --- a/Homework3/MyThreadPool.cs +++ b/Homework3/MyThreadPool.cs @@ -92,10 +92,10 @@ private void TasksPerforming() /// public class MyTask : IMyTask { - private T result; + private T? result; private MyThreadPool myThreadPool; private Exception? exception = null; - private AutoResetEvent myTaskThreadHandler = new(true); + private ManualResetEvent myTaskThreadHandler = new(true); private volatile bool isResultReady = false; /// From 2e6233704e2a0beab0ea6c876fce6d4a32a5b471 Mon Sep 17 00:00:00 2001 From: Ksenia Nigmatulina Date: Tue, 16 Apr 2024 19:58:18 +0400 Subject: [PATCH 5/7] =?UTF-8?q?=D0=9F=D0=BE=D0=BF=D1=80=D0=B0=D0=B2=D0=B8?= =?UTF-8?q?=D0=BB=D0=B0=20=D0=B7=D0=B0=D0=BC=D0=B5=D1=87=D0=B0=D0=BD=D0=B8?= =?UTF-8?q?=D1=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Homework3.Tests/MyTreadPoolTests.cs | 45 ++++++++- Homework3/IMyTask.cs | 11 ++- Homework3/MyThreadPool.cs | 136 ++++++++++++++++------------ 3 files changed, 126 insertions(+), 66 deletions(-) diff --git a/Homework3.Tests/MyTreadPoolTests.cs b/Homework3.Tests/MyTreadPoolTests.cs index ea54030..cac0c20 100644 --- a/Homework3.Tests/MyTreadPoolTests.cs +++ b/Homework3.Tests/MyTreadPoolTests.cs @@ -1,5 +1,4 @@ - -namespace Homework3.Tests; +namespace Homework3.Tests; class Tests { @@ -12,6 +11,45 @@ public void SetUp() threadPool = new(amountOfThreads); } + [Test] + public void MultiThreadThreadPoolRequest() + { + var threads = new Thread[amountOfThreads]; + var tasks = new IMyTask[amountOfThreads]; + ManualResetEvent manualResetEvent = new(false); + for (int i = 0; i < amountOfThreads; i++) + { + var localI = i; + threads[i] = new Thread(() => + { + tasks[localI] = threadPool.Submit(() => + { + manualResetEvent.WaitOne(); + Thread.Sleep(1000); + return 2 * 2; + }); + } + ); + } + + foreach (var thread in threads) + { + thread.Start(); + } + + manualResetEvent.Set(); + + foreach (var thread in threads) + { + thread.Join(); + } + + for (int i = 0; i < amountOfThreads; i++) + { + Assert.That(tasks[i].Result(), Is.EqualTo(4)); + } + } + [Test] public void ShutdownTest() { @@ -49,10 +87,9 @@ public void SubmitPlusContinueWithTest() [Test] public void ExceptionTest() { - var myTask = threadPool.Submit(() => + var myTask = threadPool.Submit(object () => { throw new InvalidOperationException(); - return 4; //without this string test doesn't work }); Assert.Throws(() => myTask!.Result()); } diff --git a/Homework3/IMyTask.cs b/Homework3/IMyTask.cs index d0350ae..0c0633f 100644 --- a/Homework3/IMyTask.cs +++ b/Homework3/IMyTask.cs @@ -1,5 +1,10 @@ -using static Homework3.MyThreadPool; +namespace Homework3; +using static Homework3.MyThreadPool; + +/// +/// interface for MyTask class. +/// public interface IMyTask { /// @@ -12,7 +17,7 @@ public interface IMyTask /// Return result of the task. /// /// - public T Result(); + public T? Result(); /// /// Return new task which operates with the previous result. @@ -20,5 +25,5 @@ public interface IMyTask /// /// /// - public MyTask ContinueWith(Func continueFunction); + public MyTask ContinueWith(Func continueFunction); } \ No newline at end of file diff --git a/Homework3/MyThreadPool.cs b/Homework3/MyThreadPool.cs index 3e6a167..63af255 100644 --- a/Homework3/MyThreadPool.cs +++ b/Homework3/MyThreadPool.cs @@ -3,34 +3,38 @@ using System; using System.Collections.Concurrent; +/// +/// MyThreadPool class is realisation of threadpool. +/// public class MyThreadPool { - private readonly AutoResetEvent threadHandler; - private readonly ConcurrentQueue taskQueque; - private readonly CancellationTokenSource cancellationTokenSource; + private readonly int timeForEndingWork = 1000; + private readonly object cancellationTokenLockObject = new(); + private readonly Thread[] threads; + private readonly AutoResetEvent threadCanWorkHandler = new(false); + private readonly ConcurrentQueue taskQueue = new(); + private readonly CancellationTokenSource cancellationTokenSource = new(); private int amountOfCompletedTasks = 0; private int amountOfWorkingThreads = 0; /// /// Creates new instance of MyThreadPool class. /// - /// + /// Max amount of working threads public MyThreadPool(int amountOfThreads) { - threadHandler = new(false); - taskQueque = new(); + threadCanWorkHandler = new(false); - cancellationTokenSource = new(); - var threads = new Thread[amountOfThreads]; + threads = new Thread[amountOfThreads]; for (int i = 0; i < amountOfThreads; i++) { - int localI = i; threads[i] = new Thread(() => { - TasksPerforming(); + PerformTask(); } - ); + ) + { IsBackground = true }; //threadpool threads are background } foreach (var thread in threads) @@ -43,70 +47,92 @@ public MyThreadPool(int amountOfThreads) /// /// Returns new task if cancellationToken wasn't requested or throws exception /// - /// - /// - /// - /// - public MyTask? Submit(Func function) + public MyTask Submit(Func function) { - if (!cancellationTokenSource.IsCancellationRequested) + lock (cancellationTokenLockObject) { - return new MyTask(this, function); + if (!cancellationTokenSource.IsCancellationRequested) + { + return new MyTask(this, function); + } } - throw new InvalidOperationException("Shutdown was requested, threadpool stoped calculating"); + throw new InvalidOperationException("Shutdown was requested, threadpool stopped calculating"); } + private void Execute(Action action) + { + taskQueue.Enqueue(action); + threadCanWorkHandler.Set(); + return; + } + + /// /// Terminates the threads: already running tasks are not interrupted, but new tasks are not accepted for execution by threads from the pool. /// public void Shutdown() { - cancellationTokenSource.Cancel(); - while (amountOfWorkingThreads != 0) + lock (cancellationTokenLockObject) + { + cancellationTokenSource.Cancel(); + } + + threadCanWorkHandler.Set(); //if all threads are on WaitOne() + + foreach (var thread in threads) { - continue; + if (!thread.Join(timeForEndingWork)) + { + thread.Interrupt(); + } } + threadCanWorkHandler.Dispose(); } - private void TasksPerforming() + private void PerformTask() { - - if (!taskQueque.IsEmpty) + while (!cancellationTokenSource.IsCancellationRequested || !taskQueue.IsEmpty) //if cancelation requested thread ends working when there are no tasks. { - var isSuccessullyRemoved = taskQueque.TryDequeue(out Action? newTask); + threadCanWorkHandler.WaitOne(); + + var isSuccessullyRemoved = taskQueue.TryDequeue(out Action? newAction); if (isSuccessullyRemoved) { - threadHandler.Set(); Interlocked.Add(ref amountOfWorkingThreads, 1); - newTask!.Invoke(); + + if (newAction == null) + { + throw new InvalidOperationException("Null action is in queue."); + } + + newAction.Invoke(); + Interlocked.Add(ref amountOfCompletedTasks, 1); Interlocked.Add(ref amountOfWorkingThreads, -1); - threadHandler.WaitOne(); } + if (!taskQueue.IsEmpty) threadCanWorkHandler.Set(); //let next take task. } + threadCanWorkHandler.Set(); //ends work, let next end. } /// /// Task for MyThreadPoolClass /// - /// public class MyTask : IMyTask { private T? result; - private MyThreadPool myThreadPool; + private readonly MyThreadPool myThreadPool; private Exception? exception = null; - private ManualResetEvent myTaskThreadHandler = new(true); - private volatile bool isResultReady = false; + private readonly ManualResetEvent isResultReadyHandler = new(false); + private bool isResultReady = false; /// /// Creates new instance of MyTask class. /// - /// - /// public MyTask(MyThreadPool myThreadPool, Func function) { this.myThreadPool = myThreadPool; - this.myThreadPool.taskQueque.Enqueue(new Action(() => + var myTaskAction = new Action(() => { try { @@ -119,53 +145,45 @@ public MyTask(MyThreadPool myThreadPool, Func function) finally { isResultReady = true; //volatile: writes all values in memory - myTaskThreadHandler.Set(); + isResultReadyHandler.Set(); } - })); - myThreadPool.TasksPerforming(); + }); + myThreadPool.Execute(myTaskAction); } /// /// Returns true, if task is completed. /// - /// public bool IsCompleted() - { - return isResultReady; - } + => isResultReady; /// - /// Return result of the task. + /// Returns result of the task. /// - /// - public T Result() + public T? Result() { - if (!isResultReady) - { - myTaskThreadHandler.Reset(); - } + isResultReadyHandler.WaitOne(); if (Volatile.Read(ref exception) == null) { return result; } - throw new AggregateException("", exception!); + throw new AggregateException("Exception from function", exception!); //is not null, cheched that above } /// /// Return new task which operates with the previous result. /// - /// - /// - /// - public MyTask ContinueWith(Func continueFunction) + public MyTask ContinueWith(Func continueFunction) { - if (!myThreadPool.cancellationTokenSource.IsCancellationRequested) + lock (myThreadPool.cancellationTokenLockObject) { - var nextFunction = new Func(() => continueFunction(Result())); - return new MyTask(myThreadPool, nextFunction); + if (!myThreadPool.cancellationTokenSource.IsCancellationRequested) + { + var nextFunction = new Func(() => continueFunction(Result())); + return new MyTask(myThreadPool, nextFunction); + } } throw new InvalidOperationException("Shutdown was requested, threadpool stoped calculating"); } } - } \ No newline at end of file From 2788e4c61c57531da3f6a9105ef978e80811b03a Mon Sep 17 00:00:00 2001 From: Ksenia Nigmatulina Date: Tue, 16 Apr 2024 20:03:35 +0400 Subject: [PATCH 6/7] volatile write --- Homework3/MyThreadPool.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Homework3/MyThreadPool.cs b/Homework3/MyThreadPool.cs index 63af255..eb9a66a 100644 --- a/Homework3/MyThreadPool.cs +++ b/Homework3/MyThreadPool.cs @@ -144,7 +144,7 @@ public MyTask(MyThreadPool myThreadPool, Func function) } finally { - isResultReady = true; //volatile: writes all values in memory + Volatile.Write(ref isResultReady, true); //writes all values in memory isResultReadyHandler.Set(); } }); From ed1d3b25ace90c88ac87467e182f02e5d8abae45 Mon Sep 17 00:00:00 2001 From: Ksenia Nigmatulina Date: Tue, 16 Apr 2024 20:05:22 +0400 Subject: [PATCH 7/7] =?UTF-8?q?=D0=98=D1=81=D0=BF=D1=80=D0=B0=D0=B2=D0=B8?= =?UTF-8?q?=D0=BB=D0=B0=20=D0=BE=D0=BF=D0=B5=D1=87=D0=B0=D1=82=D0=BA=D1=83?= =?UTF-8?q?=20=D0=B2=20=D0=BA=D0=BE=D0=BC=D0=BC=D0=B5=D0=BD=D1=82=D0=B0?= =?UTF-8?q?=D1=80=D0=B8=D0=B8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Homework3/MyThreadPool.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Homework3/MyThreadPool.cs b/Homework3/MyThreadPool.cs index eb9a66a..db81031 100644 --- a/Homework3/MyThreadPool.cs +++ b/Homework3/MyThreadPool.cs @@ -167,7 +167,7 @@ public bool IsCompleted() { return result; } - throw new AggregateException("Exception from function", exception!); //is not null, cheched that above + throw new AggregateException("Exception from function", exception!); //is not null, checked that above } ///