Last week I came across a new problem that took me a while before I was able to solve it and I want to share my experience with you.
The problem
I had a console application and one Thread was adding Jobs to a List of Jobs and another thread was iterating this list to Execute these Jobs.
ConcurrentQueue<Job> _queue = new ConcurrentQueue<Job>();
BlockingCollection<Job> _jobs = new BlockingCollection<Job>(_queue);
var job1 = new Job(1);
var job2 = new Job(2);
var job3 = new Job(3);
_jobs.Add(job1);
_jobs.Add(job2);
_jobs.Add(job3);
Here is a simplified version of the Job class:
public class Job
{
public delegate void JobFinishedHandler(object myObject, JobFinishedArgs myArgs);
public event JobFinishedHandler OnJobFinished;
public int JobId { get; set; }
public Job(int jobId)
{
this.JobId = jobId;
}
public void Execute()
{
Console.WriteLine($"Executing Job {this.JobId}");
Thread.Sleep(new Random().Next(1000, 3000));
OnJobFinished(this, new JobFinishedArgs("Job Finished!"));
}
}
public class JobFinishedArgs : EventArgs
{
private readonly string _message;
public string Message { get { return _message; } }
public JobFinishedArgs(string message)
{
this._message = message;
}
}
The initial code that iterated the List of Jobs was the following:
var thread = new Thread(() =>
{
while (!_jobs.IsCompleted)
{
var innerThread = new Thread(() =>
{
Job job = null;
try
{
job = _jobs.Take();
}
catch (InvalidOperationException) { }
job?.Execute();
})
{
// This is important as it allows the process to exit
// while this thread is running
IsBackground = true
};
innerThread.Start();
}
})
{
// This is important as it allows the process to exit
// while this thread is running
IsBackground = true
};
thread.Start();
The problem with this approach was that the jobs are executing in parallel and it means that some times job3 will finish before the previously started jobs, something I didn't want.
I needed all jobs to execute sequentially one after the other.
While I was trying to find an elegant solution to my problem I came accross ManualResetEvent and as Microsoft docs says:
Notifies one or more waiting threads that an event has occurred. This class cannot be inherited.
It was exactly what i needed.
Now i was armed with new knowledge and this is:
I will use ManualResetEventSlim
Provides a slimmed down version of ManualResetEvent.
Use the constructor ManualResetEventSlim(Boolean)
Initializes a new instance of the ManualResetEventSlim class with a Boolean value indicating whether to set the intial state to signaled.
In my case I will initialize it to true cause I want to immediately execute the first job
Use Wait() Method of ManualResetEventSlim inside my Task.Run iteration
Blocks the current thread until the current ManualResetEventSlim is set.
And immediately call Reset() of ManualResetEventSlim Method
Sets the state of the event to nonsignaled, which causes threads to block.
to stop the current Thread from executing any other Job until this one is finished first.
So I refactor the code as following:
// Set Manual Reset Event to true cause I want to immediately
// start executing the jobs, I don't want the tread to wait
ManualResetEventSlim _canExecute = new ManualResetEventSlim(true);
ConcurrentQueue<Job> _queue = new ConcurrentQueue<Job>();
BlockingCollection<Job> _jobs = new BlockingCollection<Job>(_queue);
var thread = new Thread(() =>
{
while (!_jobs.IsCompleted)
{
var innerThread = new Thread(() =>
{
Job job = null;
// Now i wait for the ManualResetEvent to be set to True
_canExecute.Wait();
//and immediatly Reset it so that the Thread will pause
// on the above line and again wait for the ManualResetEvent
// to be set to true
_canExecute.Reset();
try
{
job = _jobs.Take();
}
catch (InvalidOperationException) { }
job?.Execute();
})
{
// This is important as it allows the process to exit
// while this thread is running
IsBackground = true
};
innerThread.Start();
}
})
{
// This is important as it allows the process to exit
// while this thread is running
IsBackground = true
};
thread.Start();
So far with the above changes only the first Job will be executed.
In order to continue with the rest of the Jobs we need to use the Set() Method of ManualResetEventSlim
Sets the state of the event to signaled, which allows one or more threads waiting on the event to proceed.
and we do that by subscribing to the OnJobFinished event and set the ManualResetEventSlim to true.
var job1 = new Job(1);
var job2 = new Job(2);
var job3 = new Job(3);
job1.OnJobFinished += (e, s) => _canExecute.Set();
job2.OnJobFinished += (e, s) => _canExecute.Set();
job3.OnJobFinished += (e, s) => _canExecute.Set();
_jobs.Add(job1);
_jobs.Add(job2);
_jobs.Add(job3);
Now our List of Jobs is executing sequentially one after the other!
You can find the full code on my GitHub
This post was written with love ❤️
Top comments (12)
If you don't mind me asking, why aren't you using Tasks?
I was using Tasks but then i read this Async Guidance from .Net Core team github.com/davidfowl/AspNetCoreDia... and refactored my code.
Ah great advice there.
Just stumbled about,
If I interpret it correctly, starting a task with LongRunning flag will just do the same, create a new thread.
May be this is different in .NET Core - but will be then also apply to .NET 5.0
Link to the source code below.
BR Andre
github.com/dotnet/runtime/blob/851...
Sorry, I dont understand, if you need to wait for each method, why are you creating threads?
My thought exactly, I could understand if it was a UI app and he didn't want to block the UI thread, but it's a console app...
Seems like a classic case of over engineering, but I am welcome to any explanation of course.
Hi @lordwabbit actually the main thread of the console is listening to a queue and based on some conditions it create jobs that need to be executed in a sequential order. The only way I could know that a job was finished was listening on the JobFinished event. That is the reason I used ManualResetEventSlim and Background Threads for the process.
I can agree that the example was not very successful since I could not reproduce the whole real life case I dealt with.
I am always open for suggestions :)
But everything is sequential. You wait for input, determine if it should be processed, create a series of tasks to perform which need to be executed in order, and then execute them in order. The creation of threads just complicates the process with absolutely no benefit. It's not even taking advantage of multiple CPU's. If you were going to execute entire sets of jobs at the same time from the queue then the threading would need to be higher in the architecture. But I suppose if it was just to test some proof of concept code then it doesn't really matter. I would just like to mention that the reason I even brought it up is that I have a horrible habit of over engineering, so I tend to keep an eye out for it in my own code. Sometimes KISS is actually better.
The point of the article isn't to solve every instantiation of this problem. He intentionally slimmed down his class so that you can see the pattern and determine whether you should use the same pattern in your own use case.
Thanks @rickystam , I have a UI app where I am implementing it successfully. :)
I am really happy I could help :)
This saved my solution. Thank you.
I'm glad you found this helpful :)