解决Nginx和Fpm-Php等内部多进程之间共享数据问题
概念说明:
1. MINIT:Php扩展的初始化方法,整个模块启动时候被调用一次
2. RINIT:Php扩展的初始化方法,每个请求会调用一次
3. ClusterMap(简称CM):提供服务定位和集群地图功能,通过接收心跳和主动探测方式收集节点状态信息,统一管理多种异构集群,替换硬负载均衡设备
4. CMSubProxy:ClusterMap内部的一个订阅者客户端代理,定期和Server端通讯,获取最新的集群信息,更新内部维护的机器列表
问题描述
Nginx或者Php-CGI都是使用多进程提供大并发服务的,如果服务内部想要提供一个通用的功能模块,需要用户自己写一个Extension或者Module, 最近在做ClusterMap的订阅者客户端,订阅者客户端即Php的一个扩展,请求到来时,Php扩展会与CMServer通讯获取最新的机器列表,但是如果每次请求都去获取机器列表开销又特别大
在Apache的Module模式下,实现是简单的,Apache首先启动父进程A会调用MINIT方法,调用完成后Fork其它Httpd子进程B,A和B是父子关系,这样父进程A就可以定期更新集群信息,然后通过管道方式和子进程通讯,子进程在每个请求过来时,读取管道消息(即机器列表),实现了服务定位;但是Php-fpm模式略有不同,Php-fpm进程管理器启动进程A会调用MINIT方法,然后Fork出一个Fpm-Master进程B,进程B启动多个Php-CGI子进程C,启动工作完成后,启动进程A就退出了,子进程在每个请求过来时调用RINIT,这时父子AC进程管道通讯建立不起来,管道的数据没有办法消费,使得子进程C如果写满就会阻塞。其实这个问题很普遍,如果用修改Php的源码方式解决不见得是一个好的解决方案。
问题分析
总结一下上述问题,说白了就是多个服务进程,每个进程在接到请求后,首先需要服务定位,获得最新的机器列表(需要一次网络开销),然后再转发请求给其他的服务,接下来我们以Fpm-Php为例介绍如果解决上述问题
方案一:RInit中每个请求都先到Server端获取最新的机器列表
1. 2. 3. 4. 5. 6. 7. 8. 9. 10. 10秒钟过去了
如果这时你还没有发现问题,那么你就没有必要学习Nginx这个骑着比驴都快的玩意了,很明显,RInit中每个请求都要多一次网络开销,去Server端获取最新的机器列表,大大增加了整个请求的响应时间。
这时有人说了这个问题好解决,我不需要这么频繁的更新就好了,10请求,100请求更新一次,或者1s秒钟,10s更新一次,既不影响性能,又能达到更新效果,在性能和更新频率上做Trade-Off,这样总可以吧,于是就有了方案二
方案二:RInit中每个请求都先到Server端获取最新的机器列表,同时从Server端拿到一个过期时间,后面的请求如果没有超过过期时间则不需要再次去Server端获取更新了
方案二可以说是能解决问题,严格来说只能是部分问题,治标不治本
因为如果你想要更好性能,对于每个进程来说,就必须把更新周期变慢,失去准确性,如果想要更高的准确行,就需要每个进程频繁更新,在搜索和广告这种大并发,超时敏感的服务面前这种方案太不友好了,最重要的事,每个Worker进程都要去更新,虽然每个进程拿到的都是完全相同的信息,这里不是说Nginx的多进程模式不好,这种模式有它存在的意义,而且事实也证明了,多进程正是Nginx的高明之处,每个Worker都是独立的进程,编程简单且不需要加锁,进程间又互不影响,降低风险。
方案三:使用共享内存方式,单独启动一个更新进程,实时更新集群节点信息,写入共享内存,RInit中每个请求先读取共享内存获取最新的机器列表
方案三利用了多个Worker进程获取的机器列表相同这个特点,通过共享内存的方式在进程间共享数据,这样Worker进程既不需要网络开销,又可以快速的获取最新机器列表
解决方式
目前ClusterMap中采用的是方案三,通过共享内存的方式解决这个问题的
CMSubProxy是一个单独的更新进程,每隔500ms会向CMServer发送一个请求,获取最新的机器列表,收到响应消息后,CMSubProxy会更新进程内部维护的机器列表,更新成功后会写到共享内存中;
Php-fpm进程在每个请求到来时,首先会读取共享内存中的机器列表,然后再将请求转发给列表中的某一台可用机器,机器的选择有多种策略(轮询,随机,权重,一致性哈希等);
共享内存是mmap打开的,需要注意的是,在更新和读取的时候需要读写锁,并且锁信号量要在共享内存中,关于多个进程间共享内存锁
pthread_rwlockattr_t attr;
pthread_rwlockattr_init(&attr);
pthread_rwlockattr_setpshared(&attr, PTHREAD_PROCESS_SHARED);
手册上对pthread_rwlockattr_setpshared的描述
pthread_rwlockattr_setpshared(pthread_rwlockattr_t *attr, int pshared);
DESCRIPTION
The pthread_rwlockattr_setpshared() function sets the process-shared attribute of attr to the value referenced by pshared. pshared may be one of two values:
PTHREAD_PROCESS_SHARED Any thread of any process that has access to the memory where the read/write lock resides can manipulate the lock.
CMSubProxy写入共享内存中的数据是分全量和增量的,增量数据写在全量数据之后,这里就不详细讲述了
解决进程间共享内存,由于某个进程异常退出导致死锁问题
现在问题已经确认就是获得读锁后进程异常退出导致的,我写个测试程序复现这个问题
(! 2293)-> cat test/read_shared.cpp
#include
SharedUpdateData* _sharedUpdateData = NULL;
cm_sub::CMMapFile* _mmapFile = NULL;
int32_t initSharedMemRead(const std::string& mmap_file_path)
{
_mmapFile = new (std::nothrow) cm_sub::CMMapFile();
if (_mmapFile == NULL || !_mmapFile->open(mmap_file_path.c_str(), FILE_OPEN_WRITE) )
{
return -1;
}
_sharedUpdateData = (SharedUpdateData*)_mmapFile->offset2Addr(0);
return 0;
}
int main(int argc, char** argv)
{
if (initSharedMemRead(argv[1]) != 0) return -1;
int cnt = 100;
while (cnt > 0)
{
pthread_rwlock_rdlock( &(_sharedUpdateData->_lock));
fprintf(stdout, "version = %ld, readers = %u\n",
_sharedUpdateData->_version, _sharedUpdateData->_lock.__data.__nr_readers);
if (cnt == 190)
{
exit(0);
}
sleep(1);
pthread_rwlock_unlock( &(_sharedUpdateData->_lock));
-- cnt;
usleep(100*1000);
}
delete _mmapFile;
}
(! 2293)-> cat test/write_shared.cpp
#include
SharedUpdateData* _sharedUpdateData = NULL;
cm_sub::CMMapFile* _mmapFile = NULL;
int32_t initSharedMemWrite(const char* mmap_file_path)
{
_mmapFile = new (std::nothrow) cm_sub::CMMapFile();
if ( _mmapFile == NULL || !_mmapFile->open(mmap_file_path, FILE_OPEN_WRITE, 1024) )
{
return -1;
}
_sharedUpdateData = (SharedUpdateData *)_mmapFile->offset2Addr(0);
madvise(_sharedUpdateData, 1024, MADV_SEQUENTIAL);
pthread_rwlockattr_t attr;
memset(&attr, 0x0, sizeof(pthread_rwlockattr_t));
if (pthread_rwlockattr_init(&attr) != 0 || pthread_rwlockattr_setpshared(&attr, PTHREAD_PROCESS_SHARED) != 0)
{
return -1;
}
pthread_rwlock_init( &(_sharedUpdateData->_lock), &attr);
_sharedUpdateData->_updateTime = autil::TimeUtility::currentTime();
_sharedUpdateData->_version = 0;
return 0;
}
int main()
{
if (initSharedMemWrite("data.mmap") != 0) return -1;
int cnt = 200;
while (cnt > 0)
{
pthread_rwlock_wrlock( &(_sharedUpdateData->_lock));
++ _sharedUpdateData->_version;
fprintf(stdout, "version = %ld, readers = %u\n",
_sharedUpdateData->_version, _sharedUpdateData->_lock.__data.__nr_readers);
sleep(1);
pthread_rwlock_unlock( &(_sharedUpdateData->_lock));
-- cnt;
usleep(100*1000);
}
delete _mmapFile;
}
无论是读进程还是写进程,获取锁后来不及释放就挂掉都会有这样的问题
如何解决
问题已经复现,想想如何用一个好的办法解决,在网上找了一遍,针对读写锁没有什么好的解决办法,只能在逻辑上自己解决,能想到的是使用超时机制,即写进程内部增加一个超时时间,如果写进程到了这个时间还是不能获得锁,就认为死锁,将读进程的计数减1,这是一个暴力的解决办法,不解释了,如果谁有好的解决办法指导我下
看下读写锁的代码,读写锁和互斥锁相比,更适合用在读多写少的场景,如果读进程需要锁住时间久,就更合适使用读写锁了,我的应该场景是,读多写少,读写时间都非常短;暂时认为互斥锁和读写锁性能差别应该不大,其实读写锁内部同样使用了互斥锁,只不过是锁的时间比较短,锁住互斥区,进去看下是否有人正在写,然后就释放了,
需要注意的是,读写锁默认是写优先的,也就是说当正在写,或者进入写队列准备写时,读锁都是加不上的,需要等待
好,那我们看看互斥锁能否解决我们的问题,互斥锁内部有一个属性叫Robust锁
设置锁为Robust锁: pthread_mutexattr_setrobust_np
The robustness attribute defines the behavior when the owner
of a mutex dies. The value of robustness could be either
PTHREAD_MUTEX_ROBUST_NP or PTHREAD_MUTEX_STALLED_NP, which
are defined by the header <pthread.h>. The default value of
the robustness attribute is PTHREAD_MUTEX_STALLED_NP.
When the owner of a mutex with the PTHREAD_MUTEX_STALLED_NP
robustness attribute dies, all future calls to
pthread_mutex_lock(3C) for this mutex will be blocked from
progress in an unspecified manner.
修复非一致的Robust锁: pthread_mutex_consistent_np
A consistent mutex becomes inconsistent and is unlocked if
its owner dies while holding it, or if the process contain-
ing the owner of the mutex unmaps the memory containing the
mutex or performs one of the exec(2) functions. A subsequent
owner of the mutex will acquire the mutex with
pthread_mutex_lock(3C), which will return EOWNERDEAD to
indicate that the acquired mutex is inconsistent.
The pthread_mutex_consistent_np() function should be called
while holding the mutex acquired by a previous call to
pthread_mutex_lock() that returned EOWNERDEAD.
Since the critical section protected by the mutex could have
been left in an inconsistent state by the dead owner, the
caller should make the mutex consistent only if it is able
to make the critical section protected by the mutex con-
sistent.
简单来说就是当发现EOWNERDEAD时,pthread_mutex_consistent_np函数内部会判断这个互斥锁是不是Robust锁,如果是,并且他OwnerDie了,那么他会把锁的owner设置成自己的进程ID,这样这个锁又可以恢复可用,很简单吧
锁释放是可以解决了,但是通过共享内存在进程间共享数据时,还有一点是需要注意的,就是数据的正确性,即完整性,进程共享不同与线程,如果是一个进程中的多个线程,那么进程异常退出了,其他线程也同时退出了,进程间共享都是独立的,如果一个写线程在写共享数据的过程中,异常退出,导致写入的数据不完整,读进程读取时就会有读到不完整数据的问题,其实数据完整性非常好解决,只需要在共享内存中加一个完成标记就好了,锁住共享区后,写数据,写好之后标记为完成,就可以了,读进程在读取时判断一下完成标记
测试代码见:
(! 2295)-> cat test/read_shared_mutex.cpp
#include
SharedUpdateData* _sharedUpdateData = NULL;
cm_sub::CMMapFile* _mmapFile = NULL;
int32_t initSharedMemRead(const std::string& mmap_file_path)
{
_mmapFile = new (std::nothrow) cm_sub::CMMapFile();
if (_mmapFile == NULL || !_mmapFile->open(mmap_file_path.c_str(), FILE_OPEN_WRITE) )
{
return -1;
}
_sharedUpdateData = (SharedUpdateData*)_mmapFile->offset2Addr(0);
return 0;
}
int main(int argc, char** argv)
{
if (argc != 2) return -1;
if (initSharedMemRead(argv[1]) != 0) return -1;
int cnt = 10000;
int ret = 0;
while (cnt > 0)
{
ret = pthread_mutex_lock( &(_sharedUpdateData->_lock));
if (ret == EOWNERDEAD)
{
fprintf(stdout, "%s: version = %ld, lock = %d, %u, %d\n",
strerror(ret),
_sharedUpdateData->_version,
_sharedUpdateData->_lock.__data.__lock,
_sharedUpdateData->_lock.__data.__count,
_sharedUpdateData->_lock.__data.__owner);
ret = pthread_mutex_consistent_np( &(_sharedUpdateData->_lock));
if (ret != 0)
{
fprintf(stderr, "%s\n", strerror(ret));
pthread_mutex_unlock( &(_sharedUpdateData->_lock));
continue;
}
}
fprintf(stdout, "version = %ld, lock = %d, %u, %d\n",
_sharedUpdateData->_version,
_sharedUpdateData->_lock.__data.__lock,
_sharedUpdateData->_lock.__data.__count,
_sharedUpdateData->_lock.__data.__owner);
sleep(5);
pthread_mutex_unlock( &(_sharedUpdateData->_lock));
usleep(500*1000);
-- cnt;
}
fprintf(stdout, "go on\n");
delete _mmapFile;
}
(! 2295)-> cat test/write_shared_mutex.cpp
#include
SharedUpdateData* _sharedUpdateData = NULL;
cm_sub::CMMapFile* _mmapFile = NULL;
int32_t initSharedMemWrite(const char* mmap_file_path)
{
_mmapFile = new (std::nothrow) cm_sub::CMMapFile();
if ( _mmapFile == NULL || !_mmapFile->open(mmap_file_path, FILE_OPEN_WRITE, 1024) )
{
return -1;
}
_sharedUpdateData = (SharedUpdateData *)_mmapFile->offset2Addr(0);
madvise(_sharedUpdateData, 1024, MADV_SEQUENTIAL);
pthread_mutexattr_t attr;
memset(&attr, 0x0, sizeof(pthread_mutexattr_t));
if (pthread_mutexattr_init(&attr) != 0 || pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED) != 0)
{
return -1;
}
if (pthread_mutexattr_setrobust_np(&attr, PTHREAD_MUTEX_ROBUST_NP) != 0)
{
return -1;
}
pthread_mutex_init( &(_sharedUpdateData->_lock), &attr);
_sharedUpdateData->_version = 0;
return 0;
}
int main()
{
if (initSharedMemWrite("data.mmap") != 0) return -1;
int cnt = 200;
int ret = 0;
while (cnt > 0)
{
ret = pthread_mutex_lock( &(_sharedUpdateData->_lock));
if (ret == EOWNERDEAD)
{
fprintf(stdout, "%s: version = %ld, lock = %d, %u, %d\n",
strerror(ret),
_sharedUpdateData->_version,
_sharedUpdateData->_lock.__data.__lock,
_sharedUpdateData->_lock.__data.__count,
_sharedUpdateData->_lock.__data.__owner);
ret = pthread_mutex_consistent_np( &(_sharedUpdateData->_lock));
if (ret != 0)
{
fprintf(stderr, "%s\n", strerror(ret));
pthread_mutex_unlock( &(_sharedUpdateData->_lock));
continue;
}
}
++ _sharedUpdateData->_version;
fprintf(stdout, "version = %ld, lock = %d, %u, %d\n", _sharedUpdateData->_version,
_sharedUpdateData->_lock.__data.__lock,
_sharedUpdateData->_lock.__data.__count,
_sharedUpdateData->_lock.__data.__owner);
usleep(1000*1000);
pthread_mutex_unlock( &(_sharedUpdateData->_lock));
-- cnt;
usleep(500*1000);
}
delete _mmapFile;
}
BTW:我们都知道加锁是有开销的,不仅仅是互斥导致的等待开销,还有加锁过程都是有系统调用到内核态的,这个过程开销也很大,有一种互斥锁叫Futex锁(Fast User Mutex),Linux从2.5.7版本开始支持Futex,快速的用户层面的互斥锁,Fetux锁有更好的性能,是用户态和内核态混合使用的同步机制,如果没有锁竞争的时候,在用户态就可以判断返回,不需要系统调用,
当然任何锁都是有开销的,能不用尽量不用,使用双Buffer,释放链表,引用计数,都可以在一定程度上替代锁的使用