diff --git a/Homework3.Tests/Homework3.Tests.csproj b/Homework3.Tests/Homework3.Tests.csproj new file mode 100644 index 0000000..4a01125 --- /dev/null +++ b/Homework3.Tests/Homework3.Tests.csproj @@ -0,0 +1,23 @@ + + + + net7.0 + enable + enable + + false + true + + + + + + + + + + + + + + diff --git a/Homework3.Tests/MyTreadPoolTests.cs b/Homework3.Tests/MyTreadPoolTests.cs new file mode 100644 index 0000000..cac0c20 --- /dev/null +++ b/Homework3.Tests/MyTreadPoolTests.cs @@ -0,0 +1,132 @@ +namespace Homework3.Tests; + +class Tests +{ + readonly int amountOfThreads = 5; + MyThreadPool threadPool; + + [SetUp] + public void SetUp() + { + threadPool = new(amountOfThreads); + } + + [Test] + public void MultiThreadThreadPoolRequest() + { + var threads = new Thread[amountOfThreads]; + var tasks = new IMyTask[amountOfThreads]; + ManualResetEvent manualResetEvent = new(false); + for (int i = 0; i < amountOfThreads; i++) + { + var localI = i; + threads[i] = new Thread(() => + { + tasks[localI] = threadPool.Submit(() => + { + manualResetEvent.WaitOne(); + Thread.Sleep(1000); + return 2 * 2; + }); + } + ); + } + + foreach (var thread in threads) + { + thread.Start(); + } + + manualResetEvent.Set(); + + foreach (var thread in threads) + { + thread.Join(); + } + + for (int i = 0; i < amountOfThreads; i++) + { + Assert.That(tasks[i].Result(), Is.EqualTo(4)); + } + } + + [Test] + public void ShutdownTest() + { + var myTask1 = threadPool.Submit(() => 2 * 2)!.ContinueWith(x => x.ToString()); + Thread.Sleep(1000); + threadPool.Shutdown(); + Thread.Sleep(100); + Assert.Throws(() => threadPool.Submit(() => 2 * 2)!.ContinueWith(x => x.ToString())); + } + + [Test] + public void SubmitTest() + { + var myTask = threadPool.Submit(() => 2 * 2)!; + Thread.Sleep(1000); + Assert.Multiple(() => + { + Assert.That(myTask.IsCompleted(), Is.EqualTo(true)); + Assert.That(myTask.Result(), Is.EqualTo(4)); + }); + } + + [Test] + public void SubmitPlusContinueWithTest() + { + var myTask = threadPool.Submit(() => 2 * 2)!.ContinueWith(x => x.ToString()); + Thread.Sleep(1000); + Assert.Multiple(() => + { + Assert.That(myTask.IsCompleted(), Is.EqualTo(true)); + Assert.That(myTask.Result(), Is.EqualTo("4")); + }); + } + + [Test] + public void ExceptionTest() + { + var myTask = threadPool.Submit(object () => + { + throw new InvalidOperationException(); + }); + Assert.Throws(() => myTask!.Result()); + } + + [Test] + public void ThreeContinueWithTest() + { + var myTask = threadPool.Submit(() => 2 * 2)!.ContinueWith(x => x.ToString()).ContinueWith(y => y + " "); + Thread.Sleep(1000); + Assert.Multiple(() => + { + Assert.That(myTask.IsCompleted(), Is.EqualTo(true)); + Assert.That(myTask.Result(), Is.EqualTo("4 ")); + }); + } + + [Test] + public void ThereNThreadsInMyThreadPoolTest() + { + var threadsId = new int[amountOfThreads]; + for (int i = 0; i < amountOfThreads; i++) + { + int localI = i; + threadPool.Submit(() => + { + threadsId[localI] = Environment.CurrentManagedThreadId; + Thread.Sleep(1000); + return 2 * 2; + }); + } + Dictionary dict = new(); + for (int i = 0; i < amountOfThreads; i++) + { + dict.Add(i, threadsId[i]); + } + var values = dict.Select(x => x.Value).ToArray(); + + Assert.That(values.Count, Is.EqualTo(amountOfThreads)); + } +} \ No newline at end of file diff --git a/Homework3.Tests/Usings.cs b/Homework3.Tests/Usings.cs new file mode 100644 index 0000000..9a28bd8 --- /dev/null +++ b/Homework3.Tests/Usings.cs @@ -0,0 +1 @@ +global using NUnit.Framework; diff --git a/Homework3/EternalTaskException.cs b/Homework3/EternalTaskException.cs new file mode 100644 index 0000000..7c1c315 --- /dev/null +++ b/Homework3/EternalTaskException.cs @@ -0,0 +1,16 @@ + +/// +/// Thrown if the task hasn't been completed for some exact time after Shutdown. +/// +public class EternalTaskException : Exception +{ + /// + /// Initializes a new instance of the class. + /// + /// Error message. + public EternalTaskException(string message) + : base(message) + { + } +} + diff --git a/Homework3/Homework3.csproj b/Homework3/Homework3.csproj new file mode 100644 index 0000000..4658cbf --- /dev/null +++ b/Homework3/Homework3.csproj @@ -0,0 +1,9 @@ + + + + net7.0 + enable + enable + + + diff --git a/Homework3/Homework3.sln b/Homework3/Homework3.sln new file mode 100644 index 0000000..72dec81 --- /dev/null +++ b/Homework3/Homework3.sln @@ -0,0 +1,31 @@ + +Microsoft Visual Studio Solution File, Format Version 12.00 +# Visual Studio Version 16 +VisualStudioVersion = 25.0.1706.3 +MinimumVisualStudioVersion = 10.0.40219.1 +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Homework3", "Homework3.csproj", "{6CD95AEA-46EF-4C7C-B3E3-C4B948912D59}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Homework3.Tests", "..\Homework3.Tests\Homework3.Tests.csproj", "{19B0F252-AE39-4460-A8C3-ACD58D24B58B}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Release|Any CPU = Release|Any CPU + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {6CD95AEA-46EF-4C7C-B3E3-C4B948912D59}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {6CD95AEA-46EF-4C7C-B3E3-C4B948912D59}.Debug|Any CPU.Build.0 = Debug|Any CPU + {6CD95AEA-46EF-4C7C-B3E3-C4B948912D59}.Release|Any CPU.ActiveCfg = Release|Any CPU + {6CD95AEA-46EF-4C7C-B3E3-C4B948912D59}.Release|Any CPU.Build.0 = Release|Any CPU + {19B0F252-AE39-4460-A8C3-ACD58D24B58B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {19B0F252-AE39-4460-A8C3-ACD58D24B58B}.Debug|Any CPU.Build.0 = Debug|Any CPU + {19B0F252-AE39-4460-A8C3-ACD58D24B58B}.Release|Any CPU.ActiveCfg = Release|Any CPU + {19B0F252-AE39-4460-A8C3-ACD58D24B58B}.Release|Any CPU.Build.0 = Release|Any CPU + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection + GlobalSection(ExtensibilityGlobals) = postSolution + SolutionGuid = {44148E15-051F-4366-A340-D677BCF6B98B} + EndGlobalSection +EndGlobal diff --git a/Homework3/IMyTask.cs b/Homework3/IMyTask.cs new file mode 100644 index 0000000..0c0633f --- /dev/null +++ b/Homework3/IMyTask.cs @@ -0,0 +1,29 @@ +namespace Homework3; + +using static Homework3.MyThreadPool; + +/// +/// interface for MyTask class. +/// +public interface IMyTask +{ + /// + /// Returns true, if task is completed. + /// + /// + public bool IsCompleted(); + + /// + /// Return result of the task. + /// + /// + public T? Result(); + + /// + /// Return new task which operates with the previous result. + /// + /// + /// + /// + public MyTask ContinueWith(Func continueFunction); +} \ No newline at end of file diff --git a/Homework3/MyThreadPool.cs b/Homework3/MyThreadPool.cs new file mode 100644 index 0000000..db81031 --- /dev/null +++ b/Homework3/MyThreadPool.cs @@ -0,0 +1,189 @@ +namespace Homework3; + +using System; +using System.Collections.Concurrent; + +/// +/// MyThreadPool class is realisation of threadpool. +/// +public class MyThreadPool +{ + private readonly int timeForEndingWork = 1000; + private readonly object cancellationTokenLockObject = new(); + private readonly Thread[] threads; + private readonly AutoResetEvent threadCanWorkHandler = new(false); + private readonly ConcurrentQueue taskQueue = new(); + private readonly CancellationTokenSource cancellationTokenSource = new(); + private int amountOfCompletedTasks = 0; + private int amountOfWorkingThreads = 0; + + /// + /// Creates new instance of MyThreadPool class. + /// + /// Max amount of working threads + public MyThreadPool(int amountOfThreads) + { + threadCanWorkHandler = new(false); + + threads = new Thread[amountOfThreads]; + + for (int i = 0; i < amountOfThreads; i++) + { + threads[i] = new Thread(() => + { + PerformTask(); + } + ) + { IsBackground = true }; //threadpool threads are background + } + + foreach (var thread in threads) + { + thread.Start(); + } + + } + + /// + /// Returns new task if cancellationToken wasn't requested or throws exception + /// + public MyTask Submit(Func function) + { + lock (cancellationTokenLockObject) + { + if (!cancellationTokenSource.IsCancellationRequested) + { + return new MyTask(this, function); + } + } + throw new InvalidOperationException("Shutdown was requested, threadpool stopped calculating"); + } + + private void Execute(Action action) + { + taskQueue.Enqueue(action); + threadCanWorkHandler.Set(); + return; + } + + + /// + /// Terminates the threads: already running tasks are not interrupted, but new tasks are not accepted for execution by threads from the pool. + /// + public void Shutdown() + { + lock (cancellationTokenLockObject) + { + cancellationTokenSource.Cancel(); + } + + threadCanWorkHandler.Set(); //if all threads are on WaitOne() + + foreach (var thread in threads) + { + if (!thread.Join(timeForEndingWork)) + { + thread.Interrupt(); + } + } + threadCanWorkHandler.Dispose(); + } + + private void PerformTask() + { + while (!cancellationTokenSource.IsCancellationRequested || !taskQueue.IsEmpty) //if cancelation requested thread ends working when there are no tasks. + { + threadCanWorkHandler.WaitOne(); + + var isSuccessullyRemoved = taskQueue.TryDequeue(out Action? newAction); + if (isSuccessullyRemoved) + { + Interlocked.Add(ref amountOfWorkingThreads, 1); + + if (newAction == null) + { + throw new InvalidOperationException("Null action is in queue."); + } + + newAction.Invoke(); + + Interlocked.Add(ref amountOfCompletedTasks, 1); + Interlocked.Add(ref amountOfWorkingThreads, -1); + } + if (!taskQueue.IsEmpty) threadCanWorkHandler.Set(); //let next take task. + } + threadCanWorkHandler.Set(); //ends work, let next end. + } + + /// + /// Task for MyThreadPoolClass + /// + public class MyTask : IMyTask + { + private T? result; + private readonly MyThreadPool myThreadPool; + private Exception? exception = null; + private readonly ManualResetEvent isResultReadyHandler = new(false); + private bool isResultReady = false; + + /// + /// Creates new instance of MyTask class. + /// + public MyTask(MyThreadPool myThreadPool, Func function) + { + this.myThreadPool = myThreadPool; + var myTaskAction = new Action(() => + { + try + { + result = function.Invoke(); + } + catch (Exception e) + { + exception = e; + } + finally + { + Volatile.Write(ref isResultReady, true); //writes all values in memory + isResultReadyHandler.Set(); + } + }); + myThreadPool.Execute(myTaskAction); + } + + /// + /// Returns true, if task is completed. + /// + public bool IsCompleted() + => isResultReady; + + /// + /// Returns result of the task. + /// + public T? Result() + { + isResultReadyHandler.WaitOne(); + if (Volatile.Read(ref exception) == null) + { + return result; + } + throw new AggregateException("Exception from function", exception!); //is not null, checked that above + } + + /// + /// Return new task which operates with the previous result. + /// + public MyTask ContinueWith(Func continueFunction) + { + lock (myThreadPool.cancellationTokenLockObject) + { + if (!myThreadPool.cancellationTokenSource.IsCancellationRequested) + { + var nextFunction = new Func(() => continueFunction(Result())); + return new MyTask(myThreadPool, nextFunction); + } + } + throw new InvalidOperationException("Shutdown was requested, threadpool stoped calculating"); + } + } +} \ No newline at end of file