Redis代码阅读3--Redis网络监听(2)



这篇文章接上一篇,主要介绍Redis网络监听流程的各个步骤。

  1. aeCreateEventLoop :创建用于循环监听的 eventLoop
    , Redis 支持主流的三种事件触发机制: select ,epoll, kqueue,
    可以通过在 config.h 里面配置 HAVE_EPOLL/ HAVE_KQUEUE
    来根据不同的操作系统选择合适的机制:调用 ae_epoll.c/ae_select.c/ae_kqueue.c中的 aeApiCreate;创建
    eventLoop 的时候没有指定 beforesleep
    ,在开始循环监听前将函数 beforeSleep 绑定到 eventLoop
    上,该函数也放在后面介绍

    C代码  

    1. /* test for polling API */  
    2. #ifdef __linux__  
    3. #define HAVE_EPOLL 1  
    4. #endif  
    5.   
    6. #if (defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined (__NetBSD__)  
    7. #define HAVE_KQUEUE 1  
    8. #endif  
    /* test for polling API */
    #ifdef __linux__
    #define HAVE_EPOLL 1
    #endif
    
    #if (defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined (__NetBSD__)
    #define HAVE_KQUEUE 1
    #endif
    
  2. aeCreateTimeEvent/:创建定时事件,注册了定时时间函数 serverCron,作用放到后面介绍;
  3. aeCreateFileEvent:注册了一个读 I/O事件,绑定了函数 acceptTcpHandler(同样也放到后面介绍 ),如果多路复用采用epoll机制的话,这采用LT模式进行触发;
  4. 创建好server.ae后,通过aeMain这个方法开始网络监听;此处的代码是:

    C代码  

    1. void aeMain(aeEventLoop *eventLoop) {  
    2.     eventLoop->stop = 0;  
    3.     while (!eventLoop->stop) {  
    4.         if (eventLoop->beforesleep != NULL)  
    5.             eventLoop->beforesleep(eventLoop);  
    6.         aeProcessEvents(eventLoop, AE_ALL_EVENTS);  
    7.     }  
    8. }  
    void aeMain(aeEventLoop *eventLoop) {
        eventLoop->stop = 0;
        while (!eventLoop->stop) {
            if (eventLoop->beforesleep != NULL)
                eventLoop->beforesleep(eventLoop);
            aeProcessEvents(eventLoop, AE_ALL_EVENTS);
        }
    }

     在每次处理事件之前,都要执行一遍server.ae中设定的beforesleep方法,下面就介绍下beforesleep;

  5. Beforesleep:顾名思义,这个方法在Redis每次进入sleep/wait去等待监听的端口发生I/O事件之前被调用(这话太拗口了。。。。),还是来看代码:

    C代码  

    1. void beforeSleep(struct aeEventLoop *eventLoop) {  
    2.     REDIS_NOTUSED(eventLoop);  
    3.     listNode *ln;  
    4.     redisClient *c;  
    5.   
    6.     /* Awake clients that got all the swapped keys they requested */  
    7.     if (server.vm_enabled && listLength(server.io_ready_clients)) {  
    8.         listIter li;  
    9.   
    10.         listRewind(server.io_ready_clients,&li);  
    11.         while((ln = listNext(&li))) {  
    12.             c = ln->value;  
    13.             struct redisCommand *cmd;  
    14.   
    15.             /* Resume the client. */  
    16.             listDelNode(server.io_ready_clients,ln);  
    17.             c->flags &= (~REDIS_IO_WAIT);  
    18.             server.vm_blocked_clients--;  
    19.             aeCreateFileEvent(server.el, c->fd, AE_READABLE,  
    20.                 readQueryFromClient, c);  
    21.             cmd = lookupCommand(c->argv[0]->ptr);  
    22.             redisAssert(cmd != NULL);  
    23.             call(c,cmd);  
    24.             resetClient(c);  
    25.             /* There may be more data to process in the input buffer. */  
    26.             if (c->querybuf && sdslen(c->querybuf) > 0)  
    27.                 processInputBuffer(c);  
    28.         }  
    29.     }  
    30.   
    31.     /* Try to process pending commands for clients that were just unblocked. */  
    32.     while (listLength(server.unblocked_clients)) {  
    33.         ln = listFirst(server.unblocked_clients);  
    34.         redisAssert(ln != NULL);  
    35.         c = ln->value;  
    36.         listDelNode(server.unblocked_clients,ln);  
    37.         c->flags &= ~REDIS_UNBLOCKED;  
    38.   
    39.         /* Process remaining data in the input buffer. */  
    40.         if (c->querybuf && sdslen(c->querybuf) > 0)  
    41.             processInputBuffer(c);  
    42.     }  
    43.   
    44.     /* Write the AOF buffer on disk */  
    45.     flushAppendOnlyFile();  
    46. }  
    void beforeSleep(struct aeEventLoop *eventLoop) {
        REDIS_NOTUSED(eventLoop);
        listNode *ln;
        redisClient *c;
    
        /* Awake clients that got all the swapped keys they requested */
        if (server.vm_enabled && listLength(server.io_ready_clients)) {
            listIter li;
    
            listRewind(server.io_ready_clients,&li);
            while((ln = listNext(&li))) {
                c = ln->value;
                struct redisCommand *cmd;
    
                /* Resume the client. */
                listDelNode(server.io_ready_clients,ln);
                c->flags &= (~REDIS_IO_WAIT);
                server.vm_blocked_clients--;
                aeCreateFileEvent(server.el, c->fd, AE_READABLE,
                    readQueryFromClient, c);
                cmd = lookupCommand(c->argv[0]->ptr);
                redisAssert(cmd != NULL);
                call(c,cmd);
                resetClient(c);
                /* There may be more data to process in the input buffer. */
                if (c->querybuf && sdslen(c->querybuf) > 0)
                    processInputBuffer(c);
            }
        }
    
        /* Try to process pending commands for clients that were just unblocked. */
        while (listLength(server.unblocked_clients)) {
            ln = listFirst(server.unblocked_clients);
            redisAssert(ln != NULL);
            c = ln->value;
            listDelNode(server.unblocked_clients,ln);
            c->flags &= ~REDIS_UNBLOCKED;
    
            /* Process remaining data in the input buffer. */
            if (c->querybuf && sdslen(c->querybuf) > 0)
                processInputBuffer(c);
        }
    
        /* Write the AOF buffer on disk */
        flushAppendOnlyFile();
    }

     这个方法做了三件事情:
    I.    如果Redi开启了Virtual memory,那么某些clients请求的keys可能因为被swap了,因此这些client会被block住,当这些clients请求的keys又被swap到内存中时,则这些被block住的clients应该unblock,然后被处理;io_ready_clients就是用来维护这些clients的,为了尽快响应client的请求,因此在每次sleep前都先处理这些请求
    II.    某些Redis操作是blocking的,如BLPOP,那么执行这些操作的clients可能会被block住,unblocked_clients这个list就是用来维护那些刚被unblock的clients,如果这个list不为空,则也要尽快响应这些clients
    III.    flushAppendOnlyFile;因为clients的Socket的write只能在eventLoop里面进行,而flushAppendOnlyFile又是在每次sleep之前被调用,所以在eventLoop里面的所有AOF writes都是先写到内存里的一块buffer里面,flushAppendOnlyFile则负责把这个buffer内容flush到disk;

  6. 执行完beforesleep后aeprocessEvents,该方法主要是处理各种监听到的文件读写事件和到期响应的定时事件,因为这个方法的代码比较长,而且逻辑简单,就不贴过来了,简单介绍下过程:                                      a)    首先通过遍历eventLoop中注册的timeEvent找出离当前最近timeEvent(即shortest)。
    b)    调用epoll_wait()方法,等待I/O事件的发生, 为了尽快响应时间事件,epoll_wait()方法的等待时间为shortest与当前时间的差值,如果该差值小于零,则epoll_wait()轮询至有I/O事件发生;
    c)    响应eventLoop中fired的aeFileEvent,这里调用的就是之前设置的文件处理函数acceptTcpHandler。
    d)    响应完I/O事件后,则通过timeEventHead遍历timeEvent,逐一响应timeProc--serverCron。在响应定时事件的时候 需要注意几点点:

    C代码  

    1. static int processTimeEvents(aeEventLoop *eventLoop) {  
    2.     int processed = 0;  
    3.     aeTimeEvent *te;  
    4.     long long maxId;  
    5.   
    6.     te = eventLoop->timeEventHead;  
    7.    <span style="color: rgb(255, 0, 0);"> maxId = eventLoop->timeEventNextId-1;</span>  
    8.   
    9.   
    10.   
    11.   
    12.     while(te) {  
    13.         long now_sec, now_ms;  
    14.         long long id;  
    15.   
    16.       <span style="color: rgb(255, 0, 0);">  if (te->id > maxId) </span>  
    17.   
    18.   
    19.   
    20. {  
    21.             te = te->next;  
    22.             continue;  
    23.         }  
    24.         aeGetTime(&now_sec, &now_ms);  
    25.         if (now_sec > te->when_sec ||  
    26.             (now_sec == te->when_sec && now_ms >= te->when_ms))  
    27.         {  
    28.             int retval;  
    29.   
    30.             id = te->id;  
    31.             retval = te->timeProc(eventLoop, id, te->clientData);  
    32.             processed++;  
    33.             /* After an event is processed our time event list may 
    34.              * no longer be the same, so we restart from head. 
    35.              * Still we make sure to don't process events registered 
    36.              * by event handlers itself in order to don't loop forever. 
    37.              * To do so we saved the max ID we want to handle. 
    38.              * 
    39.              * FUTURE OPTIMIZATIONS: 
    40.              * Note that this is NOT great algorithmically. Redis uses 
    41.              * a single time event so it's not a problem but the right 
    42.              * way to do this is to add the new elements on head, and 
    43.              * to flag deleted elements in a special way for later 
    44.              * deletion (putting references to the nodes to delete into 
    45.              * another linked list). */  
    46.           <span style="color: rgb(255, 0, 0);">  if (retval != AE_NOMORE) {  
    47.                 aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);  
    48.             } else {  
    49.                 aeDeleteTimeEvent(eventLoop, id);  
    50.             }</span>  
    51.   
    52.   
    53.   
    54.   
    55.            <span style="color: rgb(255, 0, 0);"> </span>  
    56.   
    57.   
    58.   
    59. <span style="color: rgb(255, 0, 0);"><span style="background-color: rgb(255, 255, 255);">te = eventLoop->timeEventHead;</span>  
    60.   
    61.   
    62.   
    63. </span>  
    64.   
    65.   
    66.   
    67.   
    68.         } else {  
    69.             te = te->next;  
    70.         }  
    71.     }  
    72.     return processed;  
    73. }  
    static int processTimeEvents(aeEventLoop *eventLoop) {
        int processed = 0;
        aeTimeEvent *te;
        long long maxId;
    
        te = eventLoop->timeEventHead;
       <span style="color: rgb(255, 0, 0);"> maxId = eventLoop->timeEventNextId-1;</span>
    
        while(te) {
            long now_sec, now_ms;
            long long id;
    
          <span style="color: rgb(255, 0, 0);">  if (te->id > maxId) </span>
    
    {
                te = te->next;
                continue;
            }
            aeGetTime(&now_sec, &now_ms);
            if (now_sec > te->when_sec ||
                (now_sec == te->when_sec && now_ms >= te->when_ms))
            {
                int retval;
    
                id = te->id;
                retval = te->timeProc(eventLoop, id, te->clientData);
                processed++;
                /* After an event is processed our time event list may
                 * no longer be the same, so we restart from head.
                 * Still we make sure to don't process events registered
                 * by event handlers itself in order to don't loop forever.
                 * To do so we saved the max ID we want to handle.
                 *
                 * FUTURE OPTIMIZATIONS:
                 * Note that this is NOT great algorithmically. Redis uses
                 * a single time event so it's not a problem but the right
                 * way to do this is to add the new elements on head, and
                 * to flag deleted elements in a special way for later
                 * deletion (putting references to the nodes to delete into
                 * another linked list). */
              <span style="color: rgb(255, 0, 0);">  if (retval != AE_NOMORE) {
                    aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
                } else {
                    aeDeleteTimeEvent(eventLoop, id);
                }</span>
    
               <span style="color: rgb(255, 0, 0);"> </span>
    
    <span style="color: rgb(255, 0, 0);"><span style="background-color: rgb(255, 255, 255);">te = eventLoop->timeEventHead;</span>
    
    </span>
    
            } else {
                te = te->next;
            }
        }
        return processed;
    }

     

    •  因为响应一个定时事件后,eventLoop里面的定时事件链表可能会改变了,所以又要从头结点开始遍历定时事件链表;
    • 因为每次都要从头结点开始遍历定时事件链表,因此要考虑如何避免响应循环调用,即在响应定时事件a时,如果a的处理函数timeProc中又register了新的定时事件b,如果响应完事件a后,又响应b的话,那么就会造成循环响应。为了解决这个情况,redis在eventLoop里维护了一个timeEventNextId,即下一个定时事件的id,比如当前eventLoop的只有一个timeEvent  a,那么timeEventNextId=2,a->id = 1当a的timeProc方法又注册了timeEvent 
      b,那么timeEventNextId = 3,b->id = 2.那么在redis在遍历定时事件开始的时候将遍历前的eventLoop里面的maxId= timeEventNextId-1保存起来,在遍历定时事件的时候,如果某个timeEvent->id >maxId,则跳过这个事件。
    • 作者也意识到了每次都从头结点开始遍历定时事件不是一个好的算法,但是由于目前Redis里面只有一个定时事件,所以目前对redis来说不是个问题,但是作者也提到在未来的版本会对此进行改进   
  7. acceptTcpHandler:这个方法主要是监听网络端口:                                                                             i.    通过调用anetTcpAccept方法获得监听端口上的client connection;
    ii.    然后调用acceptCommonHandler创建redisClient对象,如果当前连接的client的数量大于配置的最大client数量,则拒绝当前连接,并返回” max number of clients reached”提示信息;
    iii.    调用createClient方法创建redisClient,同时注册新的fileEvent(AE_READABLE),并绑定处理函数为readQueryFromClient;

    C代码  

    1. redisClient *createClient(int fd) {  
    2.     redisClient *c = zmalloc(sizeof(redisClient));  
    3.     c->bufpos = 0;  
    4.   
    5.     anetNonBlock(NULL,fd);  
    6.     anetTcpNoDelay(NULL,fd);  
    7.     if (!c) return NULL;  
    8.     <span style="color: rgb(255, 0, 0);">if (aeCreateFileEvent(server.el,fd,AE_READABLE,  
    9.         readQueryFromClient, c) == AE_ERR)</span>  
    10.   
    11.   
    12.   
    13.   
    14.     {  
    15.         close(fd);  
    16.         zfree(c);  
    17.         return NULL;  
    18.     }  
    19.     .........  
    20. }  
    redisClient *createClient(int fd) {
        redisClient *c = zmalloc(sizeof(redisClient));
        c->bufpos = 0;
    
        anetNonBlock(NULL,fd);
        anetTcpNoDelay(NULL,fd);
        if (!c) return NULL;
        <span style="color: rgb(255, 0, 0);">if (aeCreateFileEvent(server.el,fd,AE_READABLE,
            readQueryFromClient, c) == AE_ERR)</span>
    
        {
            close(fd);
            zfree(c);
            return NULL;
        }
        .........
    }

      iv.    readQueryFromClient:从指定的Socket中读取client发送过来的数据,并按照Redis的协议(后面将单独介绍)进行解析组装成Redis的各个command,然后通过查找commandTable,执行command

  8.  _installWriteEvent:上面介绍的文件事件都是AE_READABLE事件,但Redis在执行完client请求后的命令后,向Client端return数据,就是往Socket写入数据,这使一个AE_ AE_WRITABLE事件。Redis执行完command后,调用addReply方法,然后在这个方法里面调用installWriteEvent来注册一个AE_WRITABLE事件,并绑定事件处理函数sendReplyToClient,用来把数据发送到client。
  9. serverCron: 介绍完fileEvent的处理函数后,最后我们来介绍timeEvent的处理函数。顾名思义,serverCron就是Redis Server的定时计划任务。这个方法比较复杂,处理的事情也很多,主要集中在记录Redis的运行情况(memory,clients等),AOF write, VM Swap和BGSAVE等和Redis正常运行息息相关的事项。这个方法的代码很多,因此单独介绍
时间: 2024-08-01 07:42:21

Redis代码阅读3--Redis网络监听(2)的相关文章

Redis代码阅读3--Redis网络监听(3)

 是介绍Redis网络监听的最后一篇文章,着重分析定时时间处理函数serverCron,这个函数其实已经和网络监听没多大关系了,当时因为其绑定在Redis自定义的事件库的定时事件上,所以放到一起来讲.serverCron的这个函数对Redis的正常运行来说很重要,对于Redis的使用者来说,最重要的就是能够迅速直观地看到Redis的当前的运行状况(keys,sizes,memory等),serverCron就能够使用户得知这些信息,此外,serverCron这个方法定时周期地运行,还承担了A

Redis代码阅读1--Redis启动原理

 前面写了一篇文章简单介绍Redis的list结构.再写完之后,我觉得有必要熟悉Redis的启动过程和如何读取Redis的命令,因此本文将通过分析代码来介绍Redis的启动过程,通过查看Redis 的启动脚本,得知Redis的启动时从Redis.c的main方法开始的.Redis启动可以分为以下几个步骤: 初始化Redis服务器全局配置 重置服务器Save参数(具体下文详解)和加载配置文件 初始化服务器 加载数据库 开始网络监听    一,初始化Redis服务器全局配置.这一步骤主要是主要是

基于VC实现的网络监听功能程序实例_C 语言

本文所述VC++网络监听器代码,可以实现监听网络连接所使用的协议.源IP地址.目标IP地址等信息的功能,并且能把数据内容绑定到网格控件中显示.具体功能代码部分如下所示: //线程函数 UINT ThreadFun( LPVOID pParam ) { CSniffAppDlg* pDlg = static_cast<CSniffAppDlg*>(pParam); MSG msg; char buffer[1000],sourceip[32] ,*tempbuf; char *ptemp; BY

用原始套接字实现网络监听

1.引言 网络监听工具(sinff)是提供给网络管理员的一类管理工具.在以太网中(Ethernet),当网络上连接多台计算机时,某瞬间只能有一台计算机可以传送数据.以太网中,数据是以被称为帧的数据结构为单位进行交换的.通常,在计算机网络上交换的数据结构体的单位是数据包.而在以太网中则称为帧.这种数据包是由记录着数据包发送给对方所必需信息的报头部分和记录着发送信息的报文部分构成.报头部分包含接收端的地址.发送端的地址.数据校验码等信息. 在以太网上,帧(数据包)是被称为带碰撞检测的载波侦听多址访问

如何防止网络监听与端口扫描

1.使用安全工具 有许多工具可以让我们发现系统中的漏洞,如SATAN等.SATAN是一个分析网络的管理.测试和报告许多信息,识别一些与网络相关的安全问题. 对所发现的问题,SATAN提供对这个问题的解释以及可能对系统和网络安全造成影响的程度,并且通过工具所附的资料,还能解释如何处理这些问题. 当然还有很多像这样的安全工具.包括对TCP端口的扫描或者对多台主机的所有TCP端口实现监听:分析网络协议.监视控制多个网段等,正确使用这些安全工具,及时发现系统漏洞,才能防患于未然. 而对于WindowsN

怎样防御网络监听

  尽管网络监听看上去无所不能,但是它也有一个致命弱点,就是只能作为第二波攻击手段,黑客必须已经侵入一台主机才能安放网络监听工具.而且只有在网段内部才会有广播数据,而网段之间是不会有广播数搪的,所以网络监听的局限性在于必须把它放在目标网段上. 所以对策是:尽量敢好安全防范工作,防止黑客侵人,这样就从源头堵住了网络监听的危害,另外如果xp系统中被安放了网络监听,那也会有蛛丝马迹可循.比如说网速变慢,发出的数据包无法总是被目标主机接受,你可以发Ping验证,如果经常收不到目标主机的回应,那有可能就是

UNIX系统管理:网络监听概念

网络监听工具的提供给管理员的一类管理工具.使用这种工具,可以监视网络的状态.数据流动情况以及网络上传输的信息. 但是网络监听工具也是黑客们常用的工具.当信息以明文的形式在网络上传输时,便可以使用网络监听的方式来进行攻击.将网络接口设置在监听模式,便可以源源不断地将网上传输的信息截获. 网络监听可以在网上的任何一个位置实施,如局域网中的一台主机.网关上或远程网的调制解调器之间等.黑客们用得最多的是截获用户的口令. 1什么是网络监听 网络监听是黑客们常用的一种方法.当成功地登录进一台网络上的主机,并

DTCC2013:基于网络监听数据库安全审计

本文讲的是DTCC2013:基于网络监听数据库安全审计,2013年4月18-20日,第四届中国数据库技术大会(DTCC 2013)在北京福朋喜来登酒店拉开序幕.在为期三天的会议中,大会将围绕大数据应用.数据架构.数据管理(数据治理).传统数据库软件等技术领域展开深入探讨,并将邀请一批国内顶尖的技术专家来进行分享.本届大会将在保留数据库软件应用实践这一传统主题的基础上,向大数据.数据结构.数据治理与分析.商业智能等领域进行拓展,以满足于广大从业人士和行业用户的迫切需要. 自2010年以来,国内领先

数据中心面对的网络监听技术

数据中心里最宝贵的就是数据,这些数据里隐含着很多私有的.机密信息,小到个人隐私,大到国家安全,所以保护数据是数据中心最为关键的任务,数据一旦被窃取被泄露,给数据中心带来的损失无法估计.然而,这些数据在数据中心里以及外部并不是静止的,躺在存储硬盘里睡大觉,而是通过网络在不断传递和变化着,网络成为数据传递的最为重要通道,无论是数据中心内部还是外部.对网络进行监听,就可以掌握数据的基本信息和特征,听起来网络监听这个词语贬义成分居多.而实际上,对网络监听对于数据中心管理非常重要.不过的确是凡事都有其两面