A scheduler to process a list of requests by a specific number of threads in c#.
This scheduler is not a time-based scheduler. It schedules the user tasks according to scheduling policy; scheduling policy is First Come First Serve. It performs the following tasks:
- It decides which request to execute next.
- Execute the request by one of the threads of manual thread pool if there is an idle thread in manual thread pool.
Here a specific number of threads is used to process the clients request. When Scheduler starts, A pool of threads is created to process pending requests of client, which may be created by software to meet certain requirements , sent by some network client , polled from database . In thread pooling, after a thread is created, it is placed in the pool and it is used over again so that a new thread does not have to be created. If all the threads are being used, the requests remain in pending. As soon as one of the threads becomes free, it handles the next client request in FIFO manner.
1: class Scheduler
2: {
3:
4: private ManualResetEvent manualResetEvent = new ManualResetEvent(false);
5:
6: private Queue _ClientRequests;
7: public Queue ClientRequests
8: {
9: get { return _ClientRequests; }
10: set { _ClientRequests = value; }
11: }
12:
13: private int _NumberofThreads = 3;
14: public int NumberofThreads
15: {
16: get { return _NumberofThreads; }
17: set { _NumberofThreads = value; }
18: }
19:
20: private bool IsSchedulerStarted = false;
21: private Thread[] StaticThreadPool = null;
22:
23: public Scheduler()
24: {
25: _ClientRequests = Queue.Synchronized(new Queue());
26: StaticThreadPool = new Thread[_NumberofThreads];
27:
28: for (int i = 0; i < StaticThreadPool.Length; i++)
29: {
30: StaticThreadPool[i] = new Thread(new ThreadStart(this.Work));
31: StaticThreadPool[i].IsBackground = true;
32: }
33: }
34:
35: private void Work()
36: {
37: while (true)
38: {
39: ICommand request = null;
40: lock (_ClientRequests.SyncRoot)
41: {
42: if (_ClientRequests.Count > 0)
43: {
44: request = (ICommand)_ClientRequests.Dequeue();
45: }
46: }
47: if (request != null)
48: request.Execute();
49: else
50: manualResetEvent.WaitOne();
51: if (!IsSchedulerStarted) break;
52: }
53: }
54:
55: public void Enqueue(ICommand request)
56: {
57: lock (_ClientRequests.SyncRoot)
58: {
59: _ClientRequests.Enqueue(request);
60: }
61: manualResetEvent.Set();
62: }
63:
64: public void StartScheduler()
65: {
66: foreach (Thread item in StaticThreadPool)
67: {
68: item.Start();
69: }
70: IsSchedulerStarted = true;
71: }
72:
73: public void StopScheduler()
74: {
75: IsSchedulerStarted = false;
76: foreach (Thread item in StaticThreadPool)
77: {
78: if (item != null)
79: {
80: if (item.IsAlive)
81: {
82: item.Join();
83: }
84: }
85: }
86: }
87: }
According to MSDN, Enumerating through a collection is intrinsically not a thread-safe procedure. Even when a collection is synchronized, other threads can still modify the collection, which causes the enumerator to throw an exception. To guarantee thread safety during enumeration, you can either lock the collection during the entire enumeration or catch the exceptions resulting from changes made by other threads. To guarantee the thread safety of the queue, all operations is done through the synchronized (thread safe) wrapper of queue.
To improve performance, when we do not have any messages to process in the queue, we simply block the thread until a new message appears. According to MSDN, WaitOne( ) method of ManualResetEvent blocks the current thread until the current instance receives a signal. So in each working thread, we call manualResetEvent.WaitOne to block each working thread when there is no request to process.
1: interface ICommand
2: {
3: void Execute();
4: }
5:
6: class ClientRequestType1 : ICommand
7: {
8: // data structure for requst type 1
9:
10: #region ICommand Members
11:
12: public void Execute()
13: {
14: //write code to process a requet type 1
15: }
16:
17: #endregion
18: }
19:
20:
21:
22: class ClientRequestType2 : ICommand
23: {
24: // data structure for requst type 1
25:
26: #region ICommand Members
27:
28: public void Execute()
29: {
30: //write code to process a requet type 1
31: }
32:
33: #endregion
34: }
To process each type of user request, there is a method with some parameters. Here we represent these methods using ICommand type to encapsulate each method with its parameters into an object. To process each type of user request, we ask the corresponding Icommand object to execute and it knows what to do.