进程间共享内存 由于某个进程异常退出导致死锁

解决Nginx和Fpm-Php等内部多进程之间共享数据问题

概念说明:

1. MINIT:Php扩展的初始化方法,整个模块启动时候被调用一次

2. RINIT:Php扩展的初始化方法,每个请求会调用一次

3. ClusterMap(简称CM):提供服务定位和集群地图功能,通过接收心跳和主动探测方式收集节点状态信息,统一管理多种异构集群,替换硬负载均衡设备

4. CMSubProxy:ClusterMap内部的一个订阅者客户端代理,定期和Server端通讯,获取最新的集群信息,更新内部维护的机器列表

问题描述

    Nginx或者Php-CGI都是使用多进程提供大并发服务的,如果服务内部想要提供一个通用的功能模块,需要用户自己写一个Extension或者Module, 最近在做ClusterMap的订阅者客户端,订阅者客户端即Php的一个扩展,请求到来时,Php扩展会与CMServer通讯获取最新的机器列表,但是如果每次请求都去获取机器列表开销又特别大

    在Apache的Module模式下,实现是简单的,Apache首先启动父进程A会调用MINIT方法,调用完成后Fork其它Httpd子进程B,A和B是父子关系,这样父进程A就可以定期更新集群信息,然后通过管道方式和子进程通讯,子进程在每个请求过来时,读取管道消息(即机器列表),实现了服务定位;但是Php-fpm模式略有不同,Php-fpm进程管理器启动进程A会调用MINIT方法,然后Fork出一个Fpm-Master进程B,进程B启动多个Php-CGI子进程C,启动工作完成后,启动进程A就退出了,子进程在每个请求过来时调用RINIT,这时父子AC进程管道通讯建立不起来,管道的数据没有办法消费,使得子进程C如果写满就会阻塞。其实这个问题很普遍,如果用修改Php的源码方式解决不见得是一个好的解决方案。

问题分析

总结一下上述问题,说白了就是多个服务进程,每个进程在接到请求后,首先需要服务定位,获得最新的机器列表(需要一次网络开销),然后再转发请求给其他的服务,接下来我们以Fpm-Php为例介绍如果解决上述问题
方案一:RInit中每个请求都先到Server端获取最新的机器列表

   1. 2. 3. 4. 5. 6. 7. 8. 9. 10. 10秒钟过去了
   如果这时你还没有发现问题,那么你就没有必要学习Nginx这个骑着比驴都快的玩意了,很明显,RInit中每个请求都要多一次网络开销,去Server端获取最新的机器列表,大大增加了整个请求的响应时间。
   这时有人说了这个问题好解决,我不需要这么频繁的更新就好了,10请求,100请求更新一次,或者1s秒钟,10s更新一次,既不影响性能,又能达到更新效果,在性能和更新频率上做Trade-Off,这样总可以吧,于是就有了方案二

方案二:RInit中每个请求都先到Server端获取最新的机器列表,同时从Server端拿到一个过期时间,后面的请求如果没有超过过期时间则不需要再次去Server端获取更新了

   方案二可以说是能解决问题,严格来说只能是部分问题,治标不治本
   因为如果你想要更好性能,对于每个进程来说,就必须把更新周期变慢,失去准确性,如果想要更高的准确行,就需要每个进程频繁更新,在搜索和广告这种大并发,超时敏感的服务面前这种方案太不友好了,最重要的事,每个Worker进程都要去更新,虽然每个进程拿到的都是完全相同的信息,这里不是说Nginx的多进程模式不好,这种模式有它存在的意义,而且事实也证明了,多进程正是Nginx的高明之处,每个Worker都是独立的进程,编程简单且不需要加锁,进程间又互不影响,降低风险。

方案三:使用共享内存方式,单独启动一个更新进程,实时更新集群节点信息,写入共享内存,RInit中每个请求先读取共享内存获取最新的机器列表

   方案三利用了多个Worker进程获取的机器列表相同这个特点,通过共享内存的方式在进程间共享数据,这样Worker进程既不需要网络开销,又可以快速的获取最新机器列表

解决方式

目前ClusterMap中采用的是方案三,通过共享内存的方式解决这个问题的

    CMSubProxy是一个单独的更新进程,每隔500ms会向CMServer发送一个请求,获取最新的机器列表,收到响应消息后,CMSubProxy会更新进程内部维护的机器列表,更新成功后会写到共享内存中;
    Php-fpm进程在每个请求到来时,首先会读取共享内存中的机器列表,然后再将请求转发给列表中的某一台可用机器,机器的选择有多种策略(轮询,随机,权重,一致性哈希等);

    共享内存是mmap打开的,需要注意的是,在更新和读取的时候需要读写锁,并且锁信号量要在共享内存中,关于多个进程间共享内存锁

     pthread_rwlockattr_t attr;
     pthread_rwlockattr_init(&attr);
     pthread_rwlockattr_setpshared(&attr, PTHREAD_PROCESS_SHARED);

     手册上对pthread_rwlockattr_setpshared的描述

     pthread_rwlockattr_setpshared(pthread_rwlockattr_t *attr, int pshared);

     DESCRIPTION
       The pthread_rwlockattr_setpshared() function sets the process-shared attribute of attr to the value referenced by pshared.  pshared may be one of two values:

       PTHREAD_PROCESS_SHARED   Any thread of any process that has access to the memory where the read/write lock resides can manipulate the lock.

CMSubProxy写入共享内存中的数据是分全量和增量的,增量数据写在全量数据之后,这里就不详细讲述了

解决进程间共享内存,由于某个进程异常退出导致死锁问题

现在问题已经确认就是获得读锁后进程异常退出导致的,我写个测试程序复现这个问题

(! 2293)-> cat test/read_shared.cpp

#include

SharedUpdateData*   _sharedUpdateData = NULL;
cm_sub::CMMapFile*  _mmapFile = NULL;

int32_t initSharedMemRead(const std::string& mmap_file_path)
{
    _mmapFile = new (std::nothrow) cm_sub::CMMapFile();
    if (_mmapFile == NULL || !_mmapFile->open(mmap_file_path.c_str(), FILE_OPEN_WRITE) )
    {
        return -1;
    }
    _sharedUpdateData = (SharedUpdateData*)_mmapFile->offset2Addr(0);
    return 0;
}

int main(int argc, char** argv)
{
    if (initSharedMemRead(argv[1]) != 0) return -1;

    int cnt = 100;
    while (cnt > 0)
    {
        pthread_rwlock_rdlock( &(_sharedUpdateData->_lock));
        fprintf(stdout, "version = %ld, readers = %u\n",
            _sharedUpdateData->_version, _sharedUpdateData->_lock.__data.__nr_readers);
        if (cnt == 190)
        {  
            exit(0);
        }  
        sleep(1);
        pthread_rwlock_unlock( &(_sharedUpdateData->_lock));
        -- cnt;
        usleep(100*1000);
    }
    delete _mmapFile;
}

(! 2293)-> cat test/write_shared.cpp

#include

SharedUpdateData*   _sharedUpdateData = NULL;
cm_sub::CMMapFile*  _mmapFile = NULL;

int32_t initSharedMemWrite(const char* mmap_file_path)
{
    _mmapFile = new (std::nothrow) cm_sub::CMMapFile();
    if ( _mmapFile == NULL || !_mmapFile->open(mmap_file_path, FILE_OPEN_WRITE, 1024) )
    {
        return -1;
    }
    _sharedUpdateData = (SharedUpdateData *)_mmapFile->offset2Addr(0);
    madvise(_sharedUpdateData, 1024, MADV_SEQUENTIAL);

    pthread_rwlockattr_t attr;
    memset(&attr, 0x0, sizeof(pthread_rwlockattr_t));
    if (pthread_rwlockattr_init(&attr) != 0 || pthread_rwlockattr_setpshared(&attr, PTHREAD_PROCESS_SHARED) != 0)
    {
        return -1;
    }
    pthread_rwlock_init( &(_sharedUpdateData->_lock), &attr);
    _sharedUpdateData->_updateTime = autil::TimeUtility::currentTime();
    _sharedUpdateData->_version = 0;
    return 0;
}

int main()
{
    if (initSharedMemWrite("data.mmap") != 0) return -1;

    int cnt = 200;
    while (cnt > 0)
    {
        pthread_rwlock_wrlock( &(_sharedUpdateData->_lock));
        ++ _sharedUpdateData->_version;
        fprintf(stdout, "version = %ld, readers = %u\n",
                _sharedUpdateData->_version, _sharedUpdateData->_lock.__data.__nr_readers);
        sleep(1);
        pthread_rwlock_unlock( &(_sharedUpdateData->_lock));
        -- cnt;
        usleep(100*1000);
    }
    delete _mmapFile;
}

无论是读进程还是写进程,获取锁后来不及释放就挂掉都会有这样的问题

如何解决

问题已经复现,想想如何用一个好的办法解决,在网上找了一遍,针对读写锁没有什么好的解决办法,只能在逻辑上自己解决,能想到的是使用超时机制,即写进程内部增加一个超时时间,如果写进程到了这个时间还是不能获得锁,就认为死锁,将读进程的计数减1,这是一个暴力的解决办法,不解释了,如果谁有好的解决办法指导我下

看下读写锁的代码,读写锁和互斥锁相比,更适合用在读多写少的场景,如果读进程需要锁住时间久,就更合适使用读写锁了,我的应该场景是,读多写少,读写时间都非常短;暂时认为互斥锁和读写锁性能差别应该不大,其实读写锁内部同样使用了互斥锁,只不过是锁的时间比较短,锁住互斥区,进去看下是否有人正在写,然后就释放了,
需要注意的是,读写锁默认是写优先的,也就是说当正在写,或者进入写队列准备写时,读锁都是加不上的,需要等待

好,那我们看看互斥锁能否解决我们的问题,互斥锁内部有一个属性叫Robust锁

设置锁为Robust锁: pthread_mutexattr_setrobust_np

        The robustness attribute defines the behavior when the owner
    of  a  mutex  dies.  The value of robustness could be either
    PTHREAD_MUTEX_ROBUST_NP or  PTHREAD_MUTEX_STALLED_NP,  which
    are  defined by the header <pthread.h>. The default value of
    the robustness attribute is PTHREAD_MUTEX_STALLED_NP.

        When the owner of a mutex with the  PTHREAD_MUTEX_STALLED_NP
    robustness    attribute    dies,   all   future   calls   to
    pthread_mutex_lock(3C) for this mutex will be  blocked  from
    progress in an unspecified manner.

修复非一致的Robust锁: pthread_mutex_consistent_np

        A consistent mutex becomes inconsistent and is  unlocked  if
    its  owner dies while holding it, or if the process contain-
    ing the owner of the mutex unmaps the memory containing  the
    mutex or performs one of the exec(2) functions. A subsequent
    owner  of  the   mutex   will   acquire   the   mutex   with
    pthread_mutex_lock(3C),  which  will  return  EOWNERDEAD  to
    indicate that the acquired mutex is inconsistent.

        The pthread_mutex_consistent_np() function should be  called
    while  holding  the  mutex  acquired  by  a previous call to
    pthread_mutex_lock() that returned EOWNERDEAD.

        Since the critical section protected by the mutex could have
    been  left  in  an inconsistent state by the dead owner, the
    caller should make the mutex consistent only if it  is  able
    to  make  the  critical  section protected by the mutex con-
    sistent.

简单来说就是当发现EOWNERDEAD时,pthread_mutex_consistent_np函数内部会判断这个互斥锁是不是Robust锁,如果是,并且他OwnerDie了,那么他会把锁的owner设置成自己的进程ID,这样这个锁又可以恢复可用,很简单吧

锁释放是可以解决了,但是通过共享内存在进程间共享数据时,还有一点是需要注意的,就是数据的正确性,即完整性,进程共享不同与线程,如果是一个进程中的多个线程,那么进程异常退出了,其他线程也同时退出了,进程间共享都是独立的,如果一个写线程在写共享数据的过程中,异常退出,导致写入的数据不完整,读进程读取时就会有读到不完整数据的问题,其实数据完整性非常好解决,只需要在共享内存中加一个完成标记就好了,锁住共享区后,写数据,写好之后标记为完成,就可以了,读进程在读取时判断一下完成标记

测试代码见:

(! 2295)-> cat test/read_shared_mutex.cpp

 #include

 SharedUpdateData*   _sharedUpdateData = NULL;
 cm_sub::CMMapFile*  _mmapFile = NULL;

 int32_t initSharedMemRead(const std::string& mmap_file_path)
 {
    _mmapFile = new (std::nothrow) cm_sub::CMMapFile();
    if (_mmapFile == NULL || !_mmapFile->open(mmap_file_path.c_str(), FILE_OPEN_WRITE) )
    {
        return -1;
    }
    _sharedUpdateData = (SharedUpdateData*)_mmapFile->offset2Addr(0);
    return 0;
 }

 int main(int argc, char** argv)
 {
     if (argc != 2) return -1;
     if (initSharedMemRead(argv[1]) != 0) return -1;   

     int cnt = 10000;
     int ret = 0;
     while (cnt > 0)
     {
         ret = pthread_mutex_lock( &(_sharedUpdateData->_lock));
         if (ret == EOWNERDEAD)
         {
             fprintf(stdout, "%s: version = %ld, lock = %d, %u, %d\n",
                strerror(ret),
                _sharedUpdateData->_version,
                _sharedUpdateData->_lock.__data.__lock,
                _sharedUpdateData->_lock.__data.__count,
                _sharedUpdateData->_lock.__data.__owner);
             ret = pthread_mutex_consistent_np( &(_sharedUpdateData->_lock));
             if (ret != 0)
             {
                 fprintf(stderr, "%s\n", strerror(ret));
                 pthread_mutex_unlock( &(_sharedUpdateData->_lock));
                 continue;
             }
         }
         fprintf(stdout, "version = %ld, lock = %d, %u, %d\n",
            _sharedUpdateData->_version,
            _sharedUpdateData->_lock.__data.__lock,
            _sharedUpdateData->_lock.__data.__count,
            _sharedUpdateData->_lock.__data.__owner);
         sleep(5);
         pthread_mutex_unlock( &(_sharedUpdateData->_lock));
         usleep(500*1000);
         -- cnt;
    }
    fprintf(stdout, "go on\n");
    delete _mmapFile;
 }

(! 2295)-> cat test/write_shared_mutex.cpp

#include

SharedUpdateData*   _sharedUpdateData = NULL;
cm_sub::CMMapFile*  _mmapFile = NULL;

int32_t initSharedMemWrite(const char* mmap_file_path)
{
    _mmapFile = new (std::nothrow) cm_sub::CMMapFile();
    if ( _mmapFile == NULL || !_mmapFile->open(mmap_file_path, FILE_OPEN_WRITE, 1024) )
    {
        return -1;
    }
    _sharedUpdateData = (SharedUpdateData *)_mmapFile->offset2Addr(0);
    madvise(_sharedUpdateData, 1024, MADV_SEQUENTIAL);

    pthread_mutexattr_t attr;
    memset(&attr, 0x0, sizeof(pthread_mutexattr_t));
    if (pthread_mutexattr_init(&attr) != 0 || pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED) != 0)
    {
        return -1;
    }
    if (pthread_mutexattr_setrobust_np(&attr, PTHREAD_MUTEX_ROBUST_NP) != 0)
    {
        return -1;
    }
    pthread_mutex_init( &(_sharedUpdateData->_lock), &attr);
    _sharedUpdateData->_version = 0;
    return 0;
}

int main()
{
    if (initSharedMemWrite("data.mmap") != 0) return -1;

    int cnt = 200;
    int ret = 0;
    while (cnt > 0)
    {
        ret = pthread_mutex_lock( &(_sharedUpdateData->_lock));
        if (ret == EOWNERDEAD)
        {
            fprintf(stdout, "%s: version = %ld, lock = %d, %u, %d\n",
                    strerror(ret),
                    _sharedUpdateData->_version,
                    _sharedUpdateData->_lock.__data.__lock,
                    _sharedUpdateData->_lock.__data.__count,
                                            _sharedUpdateData->_lock.__data.__owner);
            ret = pthread_mutex_consistent_np( &(_sharedUpdateData->_lock));
            if (ret != 0)
            {
                fprintf(stderr, "%s\n", strerror(ret));
                pthread_mutex_unlock( &(_sharedUpdateData->_lock));
                continue;
            }
        }
        ++ _sharedUpdateData->_version;
        fprintf(stdout, "version = %ld, lock = %d, %u, %d\n", _sharedUpdateData->_version,
                _sharedUpdateData->_lock.__data.__lock,
                _sharedUpdateData->_lock.__data.__count,
                _sharedUpdateData->_lock.__data.__owner);
        usleep(1000*1000);
        pthread_mutex_unlock( &(_sharedUpdateData->_lock));
        -- cnt;
        usleep(500*1000);
    }

    delete _mmapFile;
}

BTW:我们都知道加锁是有开销的,不仅仅是互斥导致的等待开销,还有加锁过程都是有系统调用到内核态的,这个过程开销也很大,有一种互斥锁叫Futex锁(Fast User Mutex),Linux从2.5.7版本开始支持Futex,快速的用户层面的互斥锁,Fetux锁有更好的性能,是用户态和内核态混合使用的同步机制,如果没有锁竞争的时候,在用户态就可以判断返回,不需要系统调用,

当然任何锁都是有开销的,能不用尽量不用,使用双Buffer,释放链表,引用计数,都可以在一定程度上替代锁的使用

时间: 2024-09-29 22:30:01

进程间共享内存 由于某个进程异常退出导致死锁的相关文章

C++进程间共享数据实例_C 语言

本文实例讲述了C++进程间共享数据的实现方法,分享给大家供大家参考.具体实现方法如下: 复制代码 代码如下: int main(int argc, char *argv[])  {      //RecursiveDelete("C:\\20_128\\");      //SelfRun("runModel");      //进程间内存共享      LPCTSTR lpName= "hello";      LPCTSTR lpConten

线程-有几百个进程却只有几G的内存,求解释这几百个进程如何共享内存的?

问题描述 有几百个进程却只有几G的内存,求解释这几百个进程如何共享内存的? 我感觉每个进程"占的内存"加起来应该大于物理内存,,怎样的内存管理和调度可以让这几百个进程看起来是一块运行的啊? 解决方案 同一个物理页可以映射到不同进程的地址空间,比如如果多个版本的程序都用同一个Visual C++运行时的DLL,那么只读数据是共享的.一个进程的私有内存才是说占就真占的. 另外,系统会把暂时不用的内存(比如窗口正在被最小化的程序的内存)交换到磁盘上的虚拟内存交换文件,直到下次程序想起来用(比

Python multiprocessing.Manager介绍和实例(进程间共享数据)_python

Python中进程间共享数据,处理基本的queue,pipe和value+array外,还提供了更高层次的封装.使用multiprocessing.Manager可以简单地使用这些高级接口. Manager()返回的manager对象控制了一个server进程,此进程包含的python对象可以被其他的进程通过proxies来访问.从而达到多进程间数据通信且安全. Manager支持的类型有list,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaph

在驱动和应用程序间共享内存

在不同的场合,很多驱动编写人员需要在驱动和用户程序间共享内存.两种最容易的技术是:    l 应用程序发送IOCTL给驱动程序,提供一个指向内存的指针,之后驱动程序和应用程序就可以共享内存.(应用程序分配共享内存)    l 由驱动程序分配内存页,并映射这些内存页到指定用户模式进程的地址空间,并且将地址返回给应用程序.(驱动程序分配共享内存)    使用IOCTL共享Buffer:    使用一个IOCT描述的Buffer,在驱动和用户程序间共享内存是内存共享最简单的实现形式.毕竟,IOCTL也

Linux的共享内存及内存映射

一.POSIX共享内存的实现 共享内存是在进程间共享某一块内存.是最快一种ipc通信机构.其中posix共享内存机制 它主要是通过内存映射(mmap)机制来实现的. 在进程间共享内存使用如下固定步骤: 1.创建一个共享内存 int shm_open(const char *name, int oflag, mode_t mode); name是共享内存名字,各个进程通过名字来找到同一块内存. oflag,是这个内存属性.类似于文件属性.使用O_RDWR/O_RDONLY/O_CREAT,第一次创

Posix共享内存区

1.概述 Posix提供了两种在无亲缘关系进程间共享内存区的方法: (1)内存映射文件:先有open函数打开,然后调用mmap函数把得到的描述符映射到当前进程地址空间中的一个文件(上一篇笔记所用到的就是). (2)共享内存区对象:先有shm_open打开一个Posix IPC名字(也可以是文件系统中的一个路径名),然后调用mmap将返回的描述符映射到当前进程的地址空间. 者两种方法多需要调用mmap,差别在于作为mmap的参数之一的描述符的获取手段. 2.Posix共享内存区对象 Posix共享

win32下进程间通信(共享内存)实例分析_C 语言

一.概述 很多情况下在Windows程序中,各个进程之间往往需要交换数据,进行数据通讯.WIN32 API提供了许多函数使我们能够方便高效的进行进程间的通讯,通过这些函数我们可以控制不同进程间的数据交换. 进程间通讯(即:同机通讯)和数据交换有多种方式:消息.共享内存.匿名(命名)管道.邮槽.Windows套接字等多种技术."共享内存"(shared memory)可以定义为对一个以上的进程是可见的内存或存在于多个进程的虚拟地址空间.例如:如果两个进程使用相同的DLL,只把DLL的代码

Linux 进程间通讯共享内存方式

共享内存方式:从物理内存里面拿出来一部分作为多个进程共享. 共享内存是进程间共享数据的一种最快的方法,一个进程向共享内存区域写入数据,共享这个内存的所有进程都可以立即看到其中内容. 共享内存实现步骤: 一.创建共享内存,使用shmget函数. 二.映射共享内存,将这段创建的共享内存映射到具体的进程空间去,使用shmat函数. 创建共享内存shmget: intshmget(key_t key, size_t size, int shmflg) 功能:得到一个共享内存标识符或创建一个共享内存对象并

两个进程间能使用共享内存段进行数据共享吗?

问题描述 两个进程间能使用共享内存段进行数据共享吗? 我是这样做的: 在一个dll里面定义一个共享数据段,里面定义一个整形数据,然后定义两个导出函数对这个整形数据进行操作. #pragma data_seg ("ShareDataSeg") //开辟一个名字为"ShareDataSeg"的共享内存段 int g_nShareInt = 0; #pragma data_seg() //共享内存段"ShareDataSeg"定义结束 //加上这条声明