Tuesday, July 16, 2013

Producer and multi-consumer sample code using C#

As a practice of multi-thread code using C#, I wrote a sample code of producer and consumer model. I used custom threadpool so that workers never end until they receive end signal and check work queue timely.

Producer and consumer model is simple. There is a task queue. Producer puts tasks into the queue. And consumer process a task when it finds in the queue. The code has one producer and multiple consumers.

I used basic lock mechanism. Because there's no Monitor()/Pulse() here, producer and consumer should check the queue timely with Sleep() instead.


using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
// For producer and consumer threads.
using System.Threading;

namespace ProConSample
{
  /**
   * In this sample, 2,000 tasks will be generated by a producer
   * and 10 consumers(workers) will process them.
   */
  class Conf
  {
    public const int numTasks = 2000;
    public const int numWorkers = 10;
    public const int maxTaskValue = 100;
    public const bool debugMode = true;
  }

  class Util
  {
    public static void DEBUG(string format, params object[] args)
    {
      if (Conf.debugMode)
      {
        Console.WriteLine(format, args);
      }
    }
  }

  /**
   * WorkQueue has a lock so that only one thread at a time can
   * access to it.
   */
  class WorkQueue
  {
    public readonly object aLock = new object();
    private Queue<int> _queue;

    public WorkQueue()
    {
      _queue = new Queue<int>();
    }

    public void Enqueue(int v)
    {
      // TODO: exception handling for _queue.Length reaches Max value.
      _queue.Enqueue(v);
    }

    public int Dequeue()
    {
      // TODO: exception handling for _queue is empty
      return _queue.Dequeue();
    }

    public int Length
    { get { return _queue.Count; } }
  }

  /**
   * Producer puts tasks into workQueue.
   */
  class Producer
  {
    private Random _random = new Random();
    private WorkQueue _queue;
    private const int _sleepInterval = 1;

    public Producer(WorkQueue q)
    {
      _queue = q;
    }

    public void Start()
    {
      Util.DEBUG("Producer started");

      for (int i = 0; i < Conf.numTasks; ++i)
      {
        int v = _random.Next(Conf.maxTaskValue);

        lock(_queue.aLock)
        {
          _queue.Enqueue(v);
        }
        Util.DEBUG("[Producer] Enqueued {0}", v);
        /**
         * If you don't sleep at all, this function will occupy
         * a core till it ends. If this happens in a single core
         * machine, consumers can not start its routine until
         * the producer ends its work.
         */
        Thread.Sleep(_sleepInterval);
      }

      Util.DEBUG("Producer ended");
    }
  }

  /**
   * Consumers check workQueue. If there's a task, consumer will
   * dequeue a task and will process with it.
   */
  class Consumer
  {
    private WorkQueue _queue;
    private int _taskLeft;
    public int NumTaskDone
    { get; set; }
    public bool EvEnd
    { get; set; }
    private const int _sleepInterval = 1;

    public Consumer(WorkQueue q)
    {
      _queue = q;
      _taskLeft = 0;
      EvEnd = false;
      NumTaskDone = 0;
    }

    public void Start()
    {
      Util.DEBUG("Consumer started");

      while (true) {
        int v = 0;

        // Check if we have a task to do
        lock (_queue.aLock)
        {
          _taskLeft = _queue.Length;
          if (_taskLeft > 0)
          {
            v = _queue.Dequeue();
          }
        }

        // Process a task.
        if (_taskLeft > 0)
        {
          Util.DEBUG("[Worker] Dequeued {0}", v);
          NumTaskDone++;
        }

        /**
         * Ending condition:
         * No work left in the queue and ending event received.
         */
        if (EvEnd && _taskLeft == 0)
        {
          break;
        }

        /**
         * If consumer doesn't sleep here, it will hold a core.
         * If consumers hold all cores so that the producer has
         * no chance to get core to enqueue, deadlock occurs.
         */
        Thread.Sleep(_sleepInterval);
      }

      Util.DEBUG("Consumer ended: processed {0} tasks", NumTaskDone);
    }
  }

  /**
   * ThreadPool which contains consumers and threads in the list.
   */
  class ThreadPool
  {
    Consumer[] consumers = new Consumer[Conf.numWorkers];
    Thread[] threads = new Thread[Conf.numWorkers];

    public ThreadPool(WorkQueue q)
    {
      for (int i = 0; i < Conf.numWorkers; ++i)
      {
        consumers[i] = new Consumer(q);
        threads[i] = new Thread(new ThreadStart(
          consumers[i].Start
        ));
      }
    }

    public void Start()
    {
      for (int i = 0; i < Conf.numWorkers; ++i)
      {
        threads[i].Start();
      }
    }

    public void SetEndEvent()
    {
      for (int i = 0; i < Conf.numWorkers; ++i)
      {
        consumers[i].EvEnd = true;
      }
    }

    public void Join()
    {
      for (int i = 0; i < Conf.numWorkers; ++i)
      {
        threads[i].Join();
      }
    }
  }

  class ProCon
  {
    static void Main(string[] args)
    {
      // Create a work queue.
      WorkQueue workQueue = new WorkQueue();

      // Create a producer.
      Producer producer = new Producer(workQueue);
      Thread tProducer = new Thread(new ThreadStart(producer.Start));

      // Create consumers.
      ThreadPool tConsumers = new ThreadPool(workQueue);

      // Start producer and consumers
      tProducer.Start();
      tConsumers.Start();

      // If producing ends, set an end event signal.
      tProducer.Join();
      tConsumers.SetEndEvent();

      /**
       * TODO: Is there any method to wait all thread in a specific
       * list ended?
       */
      // Join consumers thread ended
      tConsumers.Join();

    }
  }
}