Redis源码学习之BIO
BIO顾名思义,background IO,是redis中运行的后台IO。 网上千篇一律的说法是redis是单线程单进程。 实际上redis运行过程中并不是严格单进程单线程应用。
Redis中的多进程:
在写入备份(RDB,AOF)的时候,会fork出子进程进行备份文件的写入。
Redis中的多线程:
- AOF的备份模式中,如果我们设置的是AOF_FSYNC_EVERYSEC(每秒备份一次,这个设置可理解为弱同步备份),redis会create一个backgroud线程,在这个线程中执行aof备份文件的写入。
- 新生成的AOF文件,在覆盖旧AOF文件时。 如果在此之前AOF备份已经开启,在执行该fd的close前,我们的Redis进程与旧的AOF文件存在引用, 旧的AOF文件不会真正被删除。 所以当我们执行close(oldfd)时,旧AOF文件的被打开该文件的进程数为0,即没有进程打开过这个文件,这时这个文件在执行close时会被真正删除。 而删除旧AOF文件可能会阻塞服务,所以我们将它放到另一个线程调用。
- 执行DEL操作,假如碰巧这个key对应有非常多对象,那么这个删除操作会阻塞服务器几秒钟时间, 所以将删除操作放到另一个线程执行。 具体可看这篇文章: Lazy Redis is better Redis。
BIO
Redis将所有多线程操作封装到BIO中,在bio.c,bio.h中可以看到。 本文我们关注的不是具体的操作,而是Redis封装的BIO行为, 这个代码简洁,维护性好。 值得学习一下。
BIO提供以下几个api:
void bioInit(void); //初始化BIO
void bioCreateBackgroundJob(int type, void arg1, void arg2, void *arg3); //新建一个BIO任务
unsigned long long bioPendingJobsOfType(int type); //获取当前BIO任务类型,队列中待执行的任务个数
unsigned long long bioWaitStepOfType(int type); //阻塞等待某个类型的BIO任务的执行,返回等待任务个数
void bioKillThreads(void); //中断所有BIO进程
BIO操作的类型:
/ Background job opcodes /
#define BIO_CLOSE_FILE 0 / 关闭文件/
#define BIO_AOF_FSYNC 1 / AOF写入 /
#define BIO_LAZY_FREE 2 / 释放对象 /
#define BIO_NUM_OPS 3 /BIO数/
BIO对象:
static pthread_t bio_threads[BIO_NUM_OPS]; //BIO线程
static pthread_mutex_t bio_mutex[BIO_NUM_OPS]; //BIO每个线程的mutex锁变量
static pthread_cond_t bio_newjob_cond[BIO_NUM_OPS]; //BIO线程锁的条件变量, 监听这个条件变量唤起当前线程
static pthread_cond_t bio_step_cond[BIO_NUM_OPS]; //BIO线程阻塞锁,bioWaitStepOfType监听这个条件变量被通知该操作的执行。
static list *bio_jobs[BIO_NUM_OPS];
static unsigned long long bio_pending[BIO_NUM_OPS]; // BIO未执行的
我们先看初始化的时候执行的部分:
bioInit() {
for (j = 0; j < BIO_NUM_OPS; j++) {
void arg = (void)(unsigned long) j;
if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) { // 初始化线程
serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs.");
exit(1);
}
bio_threads[j] = thread;
}
}
主要功能分为两个部分:
- bioCreateBackgroundJob: 创建BIO任务,插入bio_jobs,并调用pthread_cond_signal,通知进程解锁。
- bioProcessBackgroundJobs: 执行BIO任务线程。 线程中通过pthread管理进程锁,当bioCreateBackgroundJob执行pthread_cond_signal通知到该任务对应的线程时,从bio_jobs读出上一个任务,并执行。
bioCreateBackgroundJob
void bioCreateBackgroundJob(int type, void arg1, void arg2, void *arg3) {
struct bio_job job = zmalloc(sizeof(job));
job->time = time(NULL);
job->arg1 = arg1;
job->arg2 = arg2;
job->arg3 = arg3;
pthread_mutex_lock(&bio_mutex[type]); // 加锁 保护bio_jobs和bio_pending的一致性
listAddNodeTail(bio_jobs[type],job); //插入到任务队列中
bio_pending[type]++;
pthread_cond_signal(&bio_newjob_cond[type]); //通知preocess线程,执行任务
pthread_mutex_unlock(&bio_mutex[type]); //解锁
}
bioProcessBackgroundJobs
void bioProcessBackgroundJobs(void arg) {
// 使进程可以被手动kill
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
// 加锁 确保不会有两个进程使用pthread_cond_wait监听同一个锁
pthread_mutex_lock(&bio_mutex[type]);
while(1) {
listNode *ln;
/ The loop always starts with the lock hold. /
if (listLength(bio_jobs[type]) == 0) {
pthread_cond_wait(&bio_newjob_cond[type],&bio_mutex[type]); // 等待bioCreateBackgroundJob通知解锁
continue;
}
//取队列中第一个任务
ln = listFirst(bio_jobs[type]);
job = ln->value;
/* It is now possible to unlock the background system as we know have
a stand alone job structure to process./
pthread_mutex_unlock(&bio_mutex[type]); //解锁
// 根据type执行任务
// do somethings...
pthread_cond_broadcast(&bio_step_cond[type]); // 广播解锁,用于解bioWaitStepOfType中的锁, 接触阻塞。
pthread_mutex_lock(&bio_mutex[type]); // 为下面的操作加锁,且用于下一个循环的pthread_cond_wait阻塞。
listDelNode(bio_jobs[type],ln); // 操作bio_jobs 和 bio_pending 标志这个任务已完成。
bio_pending[type]--;
}
}
pthread
整个BIO就是通过锁进行的阻塞后台IO。 如果我们梳理一下这个锁过程:
- bioInit,新建线程,执行bioProcessBackgroundJobs。
- bioProcessBackgroundJobs 中,pthread_mutex_lock(&bio_mutex[type]),给该任务的锁变量加锁。
- 进入while循环, 调用pthread_cond_wait, 等待解锁。 由于mutex锁是“sleep-lock”,线程会sleep,等待唤醒。
- 主线程调用创建BIO任务, 调用bioCreateBackgroundJob。
- bioCreateBackgroundJob中 pthread_mutex_lock(&bio_mutex[type]); 又对bio_mutex[type]加锁
- bioCreateBackgroundJob中pthread_cond_signal(&bio_newjob_cond[type]) //发送信号,通知BIO线程继续执行。
- bioCreateBackgroundJob中pthread_mutex_unlock(&bio_mutex[type]); //解锁
- bioProcessBackgroundJobs 中被唤醒继续进行。
- 执行任务完毕后,pthread_mutex_unlock解锁, pthread_cond_broadcast广播解锁。
- 再pthread_mutex_lock加锁 。 用于下一次while循环。
在梳理的时候,我发现一个奇怪的地方,我们第2步在BIO线程中加锁,第5步调用bioCreateBackgroundJob在主线程中又对mutex进行了一次加锁。 而在他们之间并没有pthread_mutex_unlock执行。 为什么bioCreateBackgroundJob没有被mutex的锁阻塞?
一切的关键都在pthread_cond_wait这个函数中。 按照我原来的理解,pthread_cond_wait应该只是进行了一次信号等待, 等到某个信号后,将mutex[type]解锁。 为什么在信号发送前,pthread_mutex_lock没有将主线程的bioCreateBackgroundJob阻塞住。 所以我猜测, pthread_cond_wait不不仅仅是一次wait signal,而是unlock+wait。
为了验证这个猜想,我们进去看pthread_cond_wait的实现:
glibc中的pthread_cond_wait
// line 93
int __pthread_cond_wait (cond, mutex)
// line 110
err = __pthread_mutex_unlock_usercnt (mutex, 0); //解锁mutex
do {
// line 155
lll_futex_wait (&cond->__data.__futex, futex_val, pshared); // wait signal
} while (val == seq || cond->__data.__woken_seq == val);
// line 193
return __pthread_mutex_cond_lock (mutex);
可以看到, pthread_cond_wait 实际上就是一次 Unlock -> Wait -> Lock。