Tuesday, July 16, 2013

Producer and multi-consumer sample code using C#

As a practice of multi-thread code using C#, I wrote a sample code of producer and consumer model. I used custom threadpool so that workers never end until they receive end signal and check work queue timely.

Producer and consumer model is simple. There is a task queue. Producer puts tasks into the queue. And consumer process a task when it finds in the queue. The code has one producer and multiple consumers.

I used basic lock mechanism. Because there's no Monitor()/Pulse() here, producer and consumer should check the queue timely with Sleep() instead.


using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
// For producer and consumer threads.
using System.Threading;

namespace ProConSample
{
  /**
   * In this sample, 2,000 tasks will be generated by a producer
   * and 10 consumers(workers) will process them.
   */
  class Conf
  {
    public const int numTasks = 2000;
    public const int numWorkers = 10;
    public const int maxTaskValue = 100;
    public const bool debugMode = true;
  }

  class Util
  {
    public static void DEBUG(string format, params object[] args)
    {
      if (Conf.debugMode)
      {
        Console.WriteLine(format, args);
      }
    }
  }

  /**
   * WorkQueue has a lock so that only one thread at a time can
   * access to it.
   */
  class WorkQueue
  {
    public readonly object aLock = new object();
    private Queue<int> _queue;

    public WorkQueue()
    {
      _queue = new Queue<int>();
    }

    public void Enqueue(int v)
    {
      // TODO: exception handling for _queue.Length reaches Max value.
      _queue.Enqueue(v);
    }

    public int Dequeue()
    {
      // TODO: exception handling for _queue is empty
      return _queue.Dequeue();
    }

    public int Length
    { get { return _queue.Count; } }
  }

  /**
   * Producer puts tasks into workQueue.
   */
  class Producer
  {
    private Random _random = new Random();
    private WorkQueue _queue;
    private const int _sleepInterval = 1;

    public Producer(WorkQueue q)
    {
      _queue = q;
    }

    public void Start()
    {
      Util.DEBUG("Producer started");

      for (int i = 0; i < Conf.numTasks; ++i)
      {
        int v = _random.Next(Conf.maxTaskValue);

        lock(_queue.aLock)
        {
          _queue.Enqueue(v);
        }
        Util.DEBUG("[Producer] Enqueued {0}", v);
        /**
         * If you don't sleep at all, this function will occupy
         * a core till it ends. If this happens in a single core
         * machine, consumers can not start its routine until
         * the producer ends its work.
         */
        Thread.Sleep(_sleepInterval);
      }

      Util.DEBUG("Producer ended");
    }
  }

  /**
   * Consumers check workQueue. If there's a task, consumer will
   * dequeue a task and will process with it.
   */
  class Consumer
  {
    private WorkQueue _queue;
    private int _taskLeft;
    public int NumTaskDone
    { get; set; }
    public bool EvEnd
    { get; set; }
    private const int _sleepInterval = 1;

    public Consumer(WorkQueue q)
    {
      _queue = q;
      _taskLeft = 0;
      EvEnd = false;
      NumTaskDone = 0;
    }

    public void Start()
    {
      Util.DEBUG("Consumer started");

      while (true) {
        int v = 0;

        // Check if we have a task to do
        lock (_queue.aLock)
        {
          _taskLeft = _queue.Length;
          if (_taskLeft > 0)
          {
            v = _queue.Dequeue();
          }
        }

        // Process a task.
        if (_taskLeft > 0)
        {
          Util.DEBUG("[Worker] Dequeued {0}", v);
          NumTaskDone++;
        }

        /**
         * Ending condition:
         * No work left in the queue and ending event received.
         */
        if (EvEnd && _taskLeft == 0)
        {
          break;
        }

        /**
         * If consumer doesn't sleep here, it will hold a core.
         * If consumers hold all cores so that the producer has
         * no chance to get core to enqueue, deadlock occurs.
         */
        Thread.Sleep(_sleepInterval);
      }

      Util.DEBUG("Consumer ended: processed {0} tasks", NumTaskDone);
    }
  }

  /**
   * ThreadPool which contains consumers and threads in the list.
   */
  class ThreadPool
  {
    Consumer[] consumers = new Consumer[Conf.numWorkers];
    Thread[] threads = new Thread[Conf.numWorkers];

    public ThreadPool(WorkQueue q)
    {
      for (int i = 0; i < Conf.numWorkers; ++i)
      {
        consumers[i] = new Consumer(q);
        threads[i] = new Thread(new ThreadStart(
          consumers[i].Start
        ));
      }
    }

    public void Start()
    {
      for (int i = 0; i < Conf.numWorkers; ++i)
      {
        threads[i].Start();
      }
    }

    public void SetEndEvent()
    {
      for (int i = 0; i < Conf.numWorkers; ++i)
      {
        consumers[i].EvEnd = true;
      }
    }

    public void Join()
    {
      for (int i = 0; i < Conf.numWorkers; ++i)
      {
        threads[i].Join();
      }
    }
  }

  class ProCon
  {
    static void Main(string[] args)
    {
      // Create a work queue.
      WorkQueue workQueue = new WorkQueue();

      // Create a producer.
      Producer producer = new Producer(workQueue);
      Thread tProducer = new Thread(new ThreadStart(producer.Start));

      // Create consumers.
      ThreadPool tConsumers = new ThreadPool(workQueue);

      // Start producer and consumers
      tProducer.Start();
      tConsumers.Start();

      // If producing ends, set an end event signal.
      tProducer.Join();
      tConsumers.SetEndEvent();

      /**
       * TODO: Is there any method to wait all thread in a specific
       * list ended?
       */
      // Join consumers thread ended
      tConsumers.Join();

    }
  }
}

Wednesday, June 12, 2013

Using libssh2 on multithread code.


Day1.

I have a daemon running on CentOS4.3. By the way, an operator told me that the server it resides should be moved to the other rack. So I decided to move and upgrade OS version to CentOS6.3 with gcc-4.4.6 during migration. I moved and compiled. But it stopped after several minutes. The daemon spawns many threads and each thread has socket connected via boost::asio. Each thread passes connected socket to libssh2_session_handshake() for ssh connection. Libssh2 library is used here. It uses libgcrypt library to generate 16 bytes of random numbers. However, gdb didn't show call stack when it goes into library routine(showed function name like ???() ). To know where it stops, I downloaded each libraries(libssh2, libgcrypt, libgpg-error) and compiled them again. Gdb finally showed that libgcrypt tries to attain mutex lock to access random pool for generating random number, it stops because it designed to fail to get mutex lock and assert. It works well on single thread environment. This isn't typical way to use libssh2 library. No helpful answer found at Google... TT.

Day2.

After doing more googling, bingging I found interesting information. Acturally both libssh2 and libgcrypt are designed for multithread environment. There was nothing to do for libssh2 to run it on MT-ed code. But something should have been done for libgcrypto. Because it doesn't know which thread mechanism is used in the host, it implemented two types of structures and functions for mutex as a macro. One for pthread and the other for pth. So the application code that I'm managing should select which macro to be used by adding following lines before it forks thread(pthread selected).

#include <gcrypt.h>
GCRY_THREAD_OPTION_PTHREAD_IMPL;

And it also should call gcry_control() so that the library assign callbacks to each functions related to mutex it uses like below.

gcry_control(GCRYCTL_SET_THREAD_CBS, &gcry_threads_pthread);

Several day has passed, and the server works well up to now. Hooray! :)

Thursday, March 28, 2013

How to check if a value is NaN in C++


I didn't know until now that if (a != a) returns true only when a is NaN.


#include <iostream>
#include <cmath>

using namespace std;

int main() {
    float a = sqrt(-1); // nan
    if (a != a) {
        cout << "It's NaN!" << endl;
        cout << a << endl; // prints 'nan'
    }
    else {
        cout << "It's OK!" << endl;
    }
    return 0;
}