Producer and consumer model is simple. There is a task queue. Producer puts tasks into the queue. And consumer process a task when it finds in the queue. The code has one producer and multiple consumers.
I used basic lock mechanism. Because there's no Monitor()/Pulse() here, producer and consumer should check the queue timely with Sleep() instead.
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; // For producer and consumer threads. using System.Threading; namespace ProConSample { /** * In this sample, 2,000 tasks will be generated by a producer * and 10 consumers(workers) will process them. */ class Conf { public const int numTasks = 2000; public const int numWorkers = 10; public const int maxTaskValue = 100; public const bool debugMode = true; } class Util { public static void DEBUG(string format, params object[] args) { if (Conf.debugMode) { Console.WriteLine(format, args); } } } /** * WorkQueue has a lock so that only one thread at a time can * access to it. */ class WorkQueue { public readonly object aLock = new object(); private Queue<int> _queue; public WorkQueue() { _queue = new Queue<int>(); } public void Enqueue(int v) { // TODO: exception handling for _queue.Length reaches Max value. _queue.Enqueue(v); } public int Dequeue() { // TODO: exception handling for _queue is empty return _queue.Dequeue(); } public int Length { get { return _queue.Count; } } } /** * Producer puts tasks into workQueue. */ class Producer { private Random _random = new Random(); private WorkQueue _queue; private const int _sleepInterval = 1; public Producer(WorkQueue q) { _queue = q; } public void Start() { Util.DEBUG("Producer started"); for (int i = 0; i < Conf.numTasks; ++i) { int v = _random.Next(Conf.maxTaskValue); lock(_queue.aLock) { _queue.Enqueue(v); } Util.DEBUG("[Producer] Enqueued {0}", v); /** * If you don't sleep at all, this function will occupy * a core till it ends. If this happens in a single core * machine, consumers can not start its routine until * the producer ends its work. */ Thread.Sleep(_sleepInterval); } Util.DEBUG("Producer ended"); } } /** * Consumers check workQueue. If there's a task, consumer will * dequeue a task and will process with it. */ class Consumer { private WorkQueue _queue; private int _taskLeft; public int NumTaskDone { get; set; } public bool EvEnd { get; set; } private const int _sleepInterval = 1; public Consumer(WorkQueue q) { _queue = q; _taskLeft = 0; EvEnd = false; NumTaskDone = 0; } public void Start() { Util.DEBUG("Consumer started"); while (true) { int v = 0; // Check if we have a task to do lock (_queue.aLock) { _taskLeft = _queue.Length; if (_taskLeft > 0) { v = _queue.Dequeue(); } } // Process a task. if (_taskLeft > 0) { Util.DEBUG("[Worker] Dequeued {0}", v); NumTaskDone++; } /** * Ending condition: * No work left in the queue and ending event received. */ if (EvEnd && _taskLeft == 0) { break; } /** * If consumer doesn't sleep here, it will hold a core. * If consumers hold all cores so that the producer has * no chance to get core to enqueue, deadlock occurs. */ Thread.Sleep(_sleepInterval); } Util.DEBUG("Consumer ended: processed {0} tasks", NumTaskDone); } } /** * ThreadPool which contains consumers and threads in the list. */ class ThreadPool { Consumer[] consumers = new Consumer[Conf.numWorkers]; Thread[] threads = new Thread[Conf.numWorkers]; public ThreadPool(WorkQueue q) { for (int i = 0; i < Conf.numWorkers; ++i) { consumers[i] = new Consumer(q); threads[i] = new Thread(new ThreadStart( consumers[i].Start )); } } public void Start() { for (int i = 0; i < Conf.numWorkers; ++i) { threads[i].Start(); } } public void SetEndEvent() { for (int i = 0; i < Conf.numWorkers; ++i) { consumers[i].EvEnd = true; } } public void Join() { for (int i = 0; i < Conf.numWorkers; ++i) { threads[i].Join(); } } } class ProCon { static void Main(string[] args) { // Create a work queue. WorkQueue workQueue = new WorkQueue(); // Create a producer. Producer producer = new Producer(workQueue); Thread tProducer = new Thread(new ThreadStart(producer.Start)); // Create consumers. ThreadPool tConsumers = new ThreadPool(workQueue); // Start producer and consumers tProducer.Start(); tConsumers.Start(); // If producing ends, set an end event signal. tProducer.Join(); tConsumers.SetEndEvent(); /** * TODO: Is there any method to wait all thread in a specific * list ended? */ // Join consumers thread ended tConsumers.Join(); } } }
No comments:
Post a Comment