有一种单一写线程,多个读线程并发的场景,比如测量数据的读取与更新,消费者会比较多,生产者只有一个。以下图为例:
左侧是一种经典的解法,对数据整个操作加锁。为了一个写数据线程,于将所有读线程也进行加锁显然有点浪费了。于是提出读写锁(Reader/Writer Lock), 即使是使用了读写锁,其本质也是一样的,而且在POSIX下的pthread它的内部实现是基于mutex,所以它的开销更大。如果没有很重的读操作来抵消它引入的开销,反而会引起性能的下降。已经多组测试数据来证明这一点。我自己也做了验证,得到数据如下 (单个写线程,20个读线程),使用读写锁反而比使用mutex要慢。详细可以参考两个链接:
* Mutex or Reader Writer Lock
* Multi-threaded programming: efficiency of locking
这一类问题,在数据库领域有一类解决方案,被称为Multiversion Concurrency Control, 其目的是以增加数据复本保证用户每一次使用都可以用到完整的数据,但不一定是最新的数据。再简化一点,其思想就是建立一个数据复本,专门用于写。当数据完全准备好后,切换出来供其它线程读。原本的数据就转为下一次写使用。 即上图中右侧所示的方式。
以这个方案,只要对Writing/Reading的处理加锁就可以了。这样测试出来的性能开销因为加锁的处理时间极短,较一般Mutex和Reader/Writer Lock都要好 (最后一个算法):
详细的不展开了。另外有一些更为通用的方式,包括平衡读写的吞吐的问题,称为Spin Buffer,有兴趣可以进一步研究。
附源代码如下供参考:
#include <pthread.h>
#include <iostream>
#include <stdio.h>
#include <unistd.h>
#include <ctime>
// #define USE_MUTEX
// #define USE_RW_LOCK
// X + Y = 0
typedef struct _Data{
int x;
int y;
} Data;
namespace {
Data globalData[2] = {{1,-1}, {1,-1}};
int WriteIndex = 0;
int ReadingIndex = 1;
float globalReadingTimeCost = 0.0f;
#ifdef USE_MUTEX
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
#endif
#ifdef USE_RW_LOCK
pthread_rwlock_t rwlock;
#endif
const int thread_number = 20;
}
void* write_thread(void* param) {
clock_t begin_time = std::clock();
for(int i=1; i<=1000; i++) {
globalData[WriteIndex].x = i;
globalData[WriteIndex].y = -1 * i;
usleep(1);
#ifdef USE_MUTEX
pthread_mutex_lock(&mutex);
#endif
#ifdef USE_RW_LOCK
pthread_rwlock_rdlock(&rwlock);
#endif
ReadingIndex = WriteIndex;
WriteIndex = (WriteIndex + 1) % 2;
#ifdef USE_MUTEX
pthread_mutex_unlock(&mutex);
#endif
#ifdef USE_RW_LOCK
pthread_rwlock_unlock(&rwlock);
#endif
usleep(600);
}
std::cout<< "[Writing Thread]" << float( std::clock () - begin_time ) / CLOCKS_PER_SEC * 1000 << std::endl;
return NULL;
}
void* read_thread(void* param) {
clock_t begin_time = std::clock();
for(int i=1; i<=20000; i++) {
#ifdef USE_MUTEX
pthread_mutex_lock(&mutex);
#endif
#ifdef USE_RW_LOCK
pthread_rwlock_wrlock(&rwlock);
#endif
int index = ReadingIndex;
#ifdef USE_MUTEX
pthread_mutex_unlock(&mutex);
#endif
#ifdef USE_RW_LOCK
pthread_rwlock_unlock(&rwlock);
#endif
int x = globalData[index].x;
int y = globalData[index].y;
if (x + y != 0) {
std::cout << std::endl << "Wrong data:" << x << "," << y << std::endl;
}
usleep(3);
}
std::cout<< "[Reading Thread]" << float( std::clock () - begin_time ) / CLOCKS_PER_SEC * 1000 << std::endl;
return NULL;
}
int main(void) {
int ret = 0;
pthread_t id[thread_number];
#ifdef USE_RW_LOCK
pthread_rwlock_init(&rwlock, NULL);
#endif
clock_t begin_time = std::clock();
// One writing thread
ret = pthread_create(&id[0], NULL, write_thread, NULL);
if (ret) {
std::cout << "Failed to launch writing thread." << std::endl;
return -1;
}
// Four reading threads
for (int i=1; i<thread_number; i++) {
pthread_create(&id[i], NULL, read_thread, NULL);
}
for (int i=0; i<=thread_number; i++) {
pthread_join(id[i], NULL);
}
std::cout<< "Cost:" << float( std::clock () - begin_time ) / CLOCKS_PER_SEC * 1000 << std::endl;
return 0;
}
使用如下方式编译测试:
g++ -std=c++11 -DUSE_MUTEX thread.cc -lpthread -o thread
有空再写篇关于多线程算法选择的文档!