A scheduler to process a list of requests by a specific number of threads in c#.

This scheduler is not a time-based scheduler. It schedules the user tasks according to scheduling policy; scheduling policy is First Come First Serve. It performs the following tasks:

  •       It decides which request to execute next.
  •       Execute the request by one of the threads of  manual thread pool  if there is an idle thread in manual thread pool.

Here a specific number of threads is used to process the clients request. When Scheduler starts, A pool of threads is created to process pending requests of client, which may be created by software to meet certain requirements , sent by some  network client , polled from database . In thread pooling, after a thread is created, it is placed in the pool and it is used over again so that a new thread does not have to be created. If all the threads are being used, the requests remain in pending. As soon as one of the threads becomes free, it handles the next client request in FIFO manner.

   1: class Scheduler
   2:    {
   3:  
   4:        private ManualResetEvent manualResetEvent = new ManualResetEvent(false);
   5:  
   6:        private Queue _ClientRequests;
   7:        public Queue ClientRequests
   8:        {
   9:            get { return _ClientRequests; }
  10:            set { _ClientRequests = value; }
  11:        }
  12:  
  13:        private int _NumberofThreads = 3;
  14:        public int NumberofThreads
  15:        {
  16:            get { return _NumberofThreads; }
  17:            set { _NumberofThreads = value; }
  18:        }
  19:  
  20:        private bool IsSchedulerStarted = false;
  21:        private Thread[] StaticThreadPool = null;
  22:  
  23:        public Scheduler()
  24:        {
  25:            _ClientRequests = Queue.Synchronized(new Queue());
  26:            StaticThreadPool = new Thread[_NumberofThreads];
  27:  
  28:            for (int i = 0; i < StaticThreadPool.Length; i++)
  29:            {
  30:                StaticThreadPool[i] = new Thread(new ThreadStart(this.Work));
  31:                StaticThreadPool[i].IsBackground = true;
  32:            }
  33:        }
  34:  
  35:        private void Work()
  36:        {
  37:            while (true)
  38:            {
  39:                ICommand request = null;
  40:                lock (_ClientRequests.SyncRoot)
  41:                {
  42:                    if (_ClientRequests.Count > 0)
  43:                    {
  44:                        request = (ICommand)_ClientRequests.Dequeue();
  45:                    }
  46:                }
  47:                if (request != null)
  48:                    request.Execute();
  49:                else
  50:                    manualResetEvent.WaitOne();
  51:                if (!IsSchedulerStarted) break;
  52:            }
  53:        }
  54:  
  55:        public void Enqueue(ICommand request)
  56:        {
  57:            lock (_ClientRequests.SyncRoot)
  58:            {
  59:                _ClientRequests.Enqueue(request);
  60:            }
  61:            manualResetEvent.Set();
  62:        }
  63:  
  64:        public void StartScheduler()
  65:        {
  66:            foreach (Thread item in StaticThreadPool)
  67:            {
  68:                item.Start();
  69:            }
  70:            IsSchedulerStarted = true;
  71:        }
  72:  
  73:        public void StopScheduler()
  74:        {
  75:            IsSchedulerStarted = false;
  76:            foreach (Thread item in StaticThreadPool)
  77:            {
  78:                if (item != null)
  79:                {
  80:                    if (item.IsAlive)
  81:                    {
  82:                        item.Join();
  83:                    }
  84:                }
  85:            }
  86:        }
  87:    }

According to MSDN, Enumerating through a collection is intrinsically not a thread-safe procedure. Even when a collection is synchronized, other threads can still modify the collection, which causes the enumerator to throw an exception. To guarantee thread safety during enumeration, you can either lock the collection during the entire enumeration or catch the exceptions resulting from changes made by other threads. To guarantee the thread safety of the queue, all operations  is done through the synchronized (thread safe) wrapper of queue.

To  improve performance, when we do not have any messages to process in the queue, we simply block the thread until a new message appears.  According to MSDN, WaitOne( )  method  of  ManualResetEvent blocks the current thread until the current instance receives a signal. So in  each working  thread, we call manualResetEvent.WaitOne  to block each working  thread  when there is no request to process.

   1: interface ICommand
   2:   {
   3:       void Execute();
   4:   }
   5:  
   6:   class ClientRequestType1 : ICommand
   7:   {
   8:       // data structure for requst type 1    
   9:  
  10:       #region ICommand Members
  11:  
  12:       public void Execute()
  13:       {
  14:           //write code to process a requet type 1
  15:       }
  16:  
  17:       #endregion
  18:   }
  19:  
  20:  
  21:  
  22:   class ClientRequestType2 : ICommand
  23:   {
  24:       // data structure for requst type 1  
  25:  
  26:       #region ICommand Members
  27:  
  28:       public void Execute()
  29:       {
  30:           //write code to process a requet type 1
  31:       }
  32:  
  33:       #endregion
  34:   }

To process each type of user request, there  is a  method with some  parameters. Here we represent these methods using ICommand type to encapsulate each method with its parameters into an object. To process each type of user  request, we ask the corresponding  Icommand object to execute and it knows what to do.

No Comments