Friday, June 6, 2008

Threads, condition variables

Thread를 동기화하는 다른 방법 중의 하나로 Condition variable을 사용하는 것이 있다. condition variable을 잘 사용하면 race-free한 thread 코드를 만들 수 있다.

condition variable자체는 mutex 에 의해 보호된다. 때문에 condition 상태를 바꾸기 위해서는 먼저 mutex lock을 걸어줘야한다. condition variable(이하 cv) 초기화는 두가지 방법이 있는데, cv가 static이면 PTHREAD_COND_INITIALIZER를 넣어주면 되고 dynamic 이면(동적 할당되었으면) pthread_conf_init()을 쓰면된다. 사용이 끝나면 pthread_cond_destroy()를 호출해준다.

#include <pthread.h>
// 0 for OK
int pthread_cond_init(pthread_cond_t *restrict cond, pthread_condattr_t *restrict attr);
int pthread_cond_destroy(pthread_cond_t *cond);

cv를 체크하는 방법으로는 아래의 두 함수를 사용할 수 있다. 아래 wait함수를 호출할 때는 인자로 넣어주는 mutex가 lock된 상태여야 한다. 그러면 wait 함수 내부에서 cv를 기다리는 threads list에 함수를 호출한 thread를 넣고 mutex를 unlock한 후 wait상태에 들어가게 된다. wait하다가 condition 이 true가 되면 mutex를 lock한 상태로 리턴하게 된다. timedwait함수의 경우는 얼만큼 오래 기다릴지를 timespec으로 지정해줄 수 있다. timedwait의 경우 condition이 발생하지 않았는데 expire된 경우 lock을 한 상태에서 ETIMEOUT error를 반환한다.

#include <pthread.h>
// 0 for OK
int pthread_cond_wait(pthread_cont_t *restrict cond, pthread_mutex_t *restrict mutex);
int pthread_cond_timedwait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex, const struct timespec *restrict timeout);

wait 함수들은 condition이 발생했다는 것은 알려주지만 자신이 이를 처음으로 통보받았다고 보장하지는 않는다. (예를 들어 한개의 condition이 있고 4개의 thread 가 있다면 각각의 thread가 pthread_cond_wait()하다가 condition이 발생해서 pthread_cond_wait()을 빠져 나왔다면, condition이 발생한 것은 확실하지만, 제일 처음 이를 통보받은 thread가 condition과 관련된 부분을 변경했을 수도 있다는 것이다) 때문에 이를 위해 condition을 다시 체크해보아야 한다.

이번에는 반대로 condition을 변경해주는 부분을 생각해보자. 아래 함수들을 사용해서 두가지 방법으로 condition이 변경되었음을 알려줄 수 있는데, signal 은 기다리고 있는 thread중 하나만 wakeup시켜주고 broadcast는 모든 thread를 깨워준다.

#include <pthread.h>
// 0 for OK
int pthread_cond_sigal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond);

아래 APUE 에서의 간단한 예제를 살펴보자.

#include <pthread.h>

// queue의 노드
struct msg {
struct msg *next;
/* ... */
}

struct msg *workq;
// condition variable
pthread_cond_t qready = PTHREAD_COND_INITIALIZER;
// mutex to protect c.v.
pthread_mutex_t qlock = PTHREAD_MUTEX_INITIALIZER;

void process_msg(void)
{
struct msg *mp;
while (1) {
pthread_mutex_lock(&qlock);
// 이미 lock했어도 아래 ...wait 에서
// condition variable을 등록 후 내부
// 적으로 unlock하기 때문에 block상
// 태가 된다.
while (workq == NULL)
pthread_cond_wait(&qready, &qlock);
// wait 에서 condition 이 바뀐 것을
// 통보 받고 lock된 상태로 빠져 나왔음
// 아래 enqueue_msg 에서 broadcast로
// 통보 했다면 여기서 queue가 비었는지
// 다시체크해야하나, 그렇게 할 경우
// thundering herd가 발생할 소지가 있음

// 원래 queue는 tail에서 값을 뽑아야 하는데,
// 여기서는 head에 넣었다가 head 에서 뽑는다
// (FIFO가 아니 LIFO,..stack이다..)
// 이유는... 글쎄 ㅡ.ㅡ;..
mp = workq;
workq = mp->next;
pthread_mutex_unlock(&qlock);
// processing msg here
}
}

void enqueue_msg(struct msg *mp)
{
pthread_mutex_lock(&qlock);
mp->next = workq;
workq = mp;
pthread_mutex_unlock(&qlock);
pthread_cond_signal(&qready);
}

No comments:

Post a Comment