MaxCompute(原ODPS) 事件(Event)机制

免费开通大数据服务:https://www.aliyun.com/product/odps

转自habai

什么是 ODPS 事件机制

Odps event 用于监控表和实例等odps资源(目前只用于监控表)。当表状态发生变化时,odps 会向预先注册(订阅)的 uri 发送信息。Event通知只有在订阅Event之后才能收到。每个project中可以订阅一个或多个Event。Event是用户的数据,同表数据一样,创建或修改时都需要有这个Project的操作权限。关于Event的Restful Api,在文章[1]里有介绍。

为什么需要 ODPS 事件机制

考虑以下场景:当一个用户 A 关心某一个表 T 的操作(创建/删除/插入/修改/...)时,如果表 T 不是用户 A 创建的,那么用户 A 可以采用什么方法感知这个操作?一个方法是主动轮询这个表是否做了某个操作,但是缺点是不言而喻的。另一个方法是,注册一个回调,当表被操作时,被动接受通知。用这种方法可以使用户逻辑不必轮询和等待对表的操作。ODPS Event机制就是第二种方法的实现。
在实际的生产中,对以上应用场景有大量的需求,并已经形成了对Odps Event丰富的应用,例如:

  • 数据地图: 订阅了一些 project 的 Event,并根据 Event 通知展示这些 project 中表的元数据。
  • 跨集群复制: 监听 Event 通知以复制相应的表。
  • 蚂蚁金服: 依赖事件通知机制进行工作流管理,统计,授权等工作。 事实上,每个 project 都有大量用户订阅了所属project的表以及其它project表的事件通知。

ODPS 事件机制是怎样实现的

本节首先将 ODPS 事件机制 作为一个黑盒,从用户的角度介绍其功能和使用方法。而后以此为切入点,深入剖析 ODPS 事件机制的内部机理。最后,提出一些对当前事件机制的思考。

订阅(注册)一个事件 & 事件通知

在网络编程中,为了减轻多线程的压力,往往使用事件通知驱动的异步编程。如,libevent[2]。使用这个库编写一个服务器程序,可以这样做:

void on_accept(int sock, short event, void* arg);

int main(int argc, char* argv[])
{
    // create socket s
    struct sockadddr_in addrin;
    int s = socket(AF_INET, SOCK_STREAM, 0);
    BOOL bReuseaddr=TRUE;
    setsockopt(s, SOL_SOCKET ,SO_REUSEADDR, (const char*)&bReuseaddr, sizeof(BOOL));
    memset(&addrin, 0, sizeof(addrin));
    addrin.sin_family = AF_INET;
    addrin.sin_port = htons(PORT);
    addrin.sin_addr.s_addr = INADDR_ANY;
    bind(s, (struct sockaddr*)&addrin, sizeof(struct sockaddr));
    listen(s, BACKLOG);

    // 创建事件池 event base
    struct event_base* eb = event_base_new();

    // 创建事件 & 绑定回调
    struct event e;
    event_set(&e, s, EV_READ|EV_PERSIST, on_accept, NULL);

    // 注册事件
    event_base_set(eb, &e);
    event_add(&e, NULL);

    // 启动事件派发
    event_base_dispatch(eb);

    return 0;
}

抽取出上面事件通知逻辑的主线:创建事件池,创建一个 event 并绑定回调函数, 把 event 注册到事件池并启动事件派发器。
在这个过程中,事件生产者是socket(严格说是绑定在这个socket上的事件多路复用接口,如epoll),事件中转者是libevent中的事件池(event base)和事件派发器,事件消费者是事件处理回调函数。

同样的过程适用于odps event。事件池和派发器不需要用户创建。用户首先创建一个事件,然后绑定回调处理逻辑,最后把事件注册到事件池。代码如下:

public class TestOdpsEvent {
    /**
     * 创建事件方法
     */
    static Event buildEvent(String eventName, String tableName, String callbackUri, String comment) throws URISyntaxException {
        Event event = new Event();
        event.setName(eventName);   // 指定事件名称
        event.setComment(comment);    // 事件注释
        event.setType(Event.SourceType.TABLE);   // 指定事件类型,目前支持 TABLE
        Event.Config config = new Event.Config();
        config.setName("source");
        config.setValue(tableName);   // 指定事件源(即 表名). "*" 表示所有表.
        event.setConfig(config);
        event.setUri(new URI(callbackUri));   // 指定了一个回调地址
        return event;
    }

    public static void main(String[] args) throws OdpsException, InterruptedException, URISyntaxException {

        Account account = new AliyunAccount("xxxxxx", "xxxxxx");
        Odps odps = new Odps(account);
        String odps_endpoint = "http://xxxx/xxx";
        odps.setEndpoint(odps_endpoint);
        odps.setDefaultProject("project1");
        InternalOdps iodps = new InternalOdps(odps);

        // 创建事件 & 绑定回调
        String callbackUri = "http://xxx.xxx.xxx.xxx:xxxx/xxxxx"; // this is different from odps_endpoint
        Event e = buildEvent("table_create_event_1", "table1", callbackUri, "this is a test event"); 

        // 注册事件
        iodps.events().create(e);

        // 查看已创建事件
        Iterator<Event> events = iodps.events().iterator();
        while(events.hasNext()) {
            Event event1 = events.next();
            System.out.println("Event found: " + event1.getName());
            System.out.println("Event uri: " + event1.getUri());
            // iodps.events().delete(event1.getName()); // 删除事件
        }
    }
}

在上面的代码中,指定了一个回调地址。当表发生变化时,就会通知这个回调地址。用户根据在这个回调地址接收到事件通知,使用相应的处理逻辑处理。事件回调地址作为事件处理逻辑入口,支持多种协议,包括但不限于kuafu, http, https等。与libevent不同的是,odps event的生产者,中转者和消费者可以位于不同网络区域。在用户注册事件之后,odps event机制会在该事件发生后立即通知用户注册的回调地址。

剖析 odps 事件机制

图3-1的三个部分分别表示了注册事件,转发通知,删除事件的过程。MessageService是 ODPS 内部消息服务,作用是转发事件通知到用户注册的回调地址。为方便理解,把 Create topic, Create subscription, Add endpoint 看作注册事件在消息服务层的三个操作。事件机制在消息服务层具体的实现将在后边介绍。


图3-1: 事件创建,转发,删除

在图3-1注册事件的过程中,用户的请求由 OdpsWorker 的 createEventHandler 处理。createEventHandler 依次检查相应的 MessageService topic,subscription,endpoint 是否存在,如果不存在,创建。
在图3-1删除事件的过程相对简单,用户的请求由 OdpsWorker 的 deleteEventHandler 处理。deleteEventHandler 直接删除相应的 MessageService subscription。
在图3-1转发事件通知的过程中,事件的生产者主要是ddl task(事实上,由于历史原因,还有HiveServer,CREATETABLE 事件从这里发出),当执行对表的meta相关的操作时,就会触发ddl task。如drop table, add partition, insert into partition等。ddl task 会发送相应操作的事件通知。事件通知发送给事件中转者——消息服务。消息服务将这个事件通知发送给相应事件绑定的回调地址。
消息服务作为事件中转者,主要完成如下功能:
1)作为事件池维护不同事件和回调地址的对应关系(一个事件对应一个或多个回调地址)
2)作为事件派发器根据事件通知匹配相应事件并将该事件通知转发到对应事件的各个回调地址。
目前不同的事件依据两个属性区分:project名和事件源。事件源目前是表名。在ddl task发出的事件通知中,包含了这两个关键信息。消息服务根据这两个信息匹配相应事件。在介绍消息服务匹配的实现方式时,首先需要了解 ODPS消息服务 的基本概念(为便于理解,本文简化了消息服务的一些概念,如隐藏了partition概念。在文章[3]中,具体介绍了消息服务的设计和实现)。如图3-2:


图3-2: 消息服务基本概念

odps消息服务包含四个基本概念:topic, subscription, filter, endpoint。消息服务使用了典型的发布订阅模型。用户可以创建topic。创建一个或多个subscription(包含一个或多个endpoint)订阅这个topic。消息发布者向topic发送消息。该消息被转发到该topic的所有filter匹配的subscription的所有的endpoint。其中,topic的创建者,subscription的创建者,消息的发送者,以及消息的接收者可以是不同的用户。在创建subscription时,需要指定filter matcher。在消息发送时需要指定filter。当某条消息发送到某个topic时,消息中的filter需要和这个topic的各个subscription的filter matcher匹配,如果匹配成功,将这个消息的一个副本发送给这个subscription的所有endpoint,否则不发送给它们,然后继续匹配其他的subscription。filter和filter_matcher的示例和匹配规则如下:

filter_matcher filter is matched
"" "k=v" yes. If filter_matcher is "", it will match forever.
"k=v" "k=v" yes
"k=v" "k=v1" no
"k1=v" "k=v" no
"k=v1|v2" "k=v1" yes
"k=v1|v2" "k=v2" yes
"k=v1|v2" "k=v1|v2" no. filter's value is 'v1|v2', not 'v1' or 'v2'.
"k1=v1,k2=v2" "k1=v1,k2=v2" yes
"k1=v1" "k1=v1,k2=v2" yes
"k1=v1,k2=v2" "k1=v1" no
"k=v" "" no
"" "" yes. If filter is "", filter_matcher will never hit except its value is ""

消息服务的这个机制,可以实现上述事件中转者的功能。将一个事件表达为一个subscription,将一个事件通知表达为一条消息,每个endpoint记录一个回调地址。每个project对应一个topic,用filter区分事件源。当一个事件通知产生之后,会被发送到产生通知的project所在的topic上。然后,经过匹配,转发所有的endpoint对应回调地址上。事件通知消息体示例如下:

<?xml version="1.0" encoding="UTF-8"?>
<Notification>
  <Account>ALIYUN$odpstest1@aliyun.com</Account>
  <Project>a_2_test_event</Project>
  <SourceType>Table</SourceType>
  <SourceName>backup_partition</SourceName>
  <Reason>CREATETABLE</Reason>
  <TimeStamp>Sun, 18 Sep 2016 14:21:32 GMT</TimeStamp>
  <Properties/>
  <OdpsMessagerId>1</OdpsMessagerId>
  <OdpsMessagerTime>1474208492</OdpsMessagerTime>
</Notification>

<?xml version="1.0" encoding="UTF-8"?>
<Notification>
  <Account>ALIYUN$odpstest1@aliyun.com</Account>
  <Project>a_2_test_event</Project>
  <SourceType>Table</SourceType>
  <SourceName>backup_partition</SourceName>
  <Reason>ADDPARTITION</Reason>
  <TimeStamp>Mon, 19 Sep 2016 12:45:42 GMT</TimeStamp>
  <Properties>
    <Property>
      <Name>Name</Name>
      <Value>ds=ds1/pt=pt1</Value>
    </Property>
  </Properties>
  <OdpsMessagerId>4</OdpsMessagerId>
  <OdpsMessagerTime>1474289142</OdpsMessagerTime>
</Notification>

当用户订阅了 Project a_2_test_event 的 Table "backup_partition" 的事件后,当发生对这个表的 CREATETABLE 和 ADDPARTITION 操作后,会接收到上面的两个事件通知。每个事件通知是一个 xml 格式的消息。SourceType 表示订阅的是表的事件通知还是其它类型资源的事件通知(目前只支持表)。SourceName 表示订阅的表的名字。Reason 表示在该表上发生的操作,上例中分别是创建表的操作和增加分区的操作(在 附录 中列举了更多的操作类型)。Properties 中会有一些附加的通知属性,常用来指出操作发生在表的哪个 parition 上。OdpsMessagerId 在一个 Project 的所有表中是唯一的。OdpsMessagerTime 是这条通知产生的时刻。

在odps线上服务环境中,每一个project p1,就会对应一个名为 SQL_p1的topic(因为历史原因hardcode了前缀SQL_,不过前缀是什么无所谓,只要可以区分事件机制的topic和其它应用中的topic就好),这个 topic 在第一次注册事件的时候自动创建(也可以在创建project时手动创建)。p1的所有事件通知都会发到这个topic上。这个topic在其对应的project删除时被删除。
odps消息服务为事件机制提供了对事件通知的持久化,保序,failover的功能,尽最大努力保证消息不丢,但是依然不能保证绝对不丢。下面分析事件机制可能出现的丢消息情况:事件生产者失败,消息服务失败,事件接收者失败,消息服务热升级。
1) 事件生产者失败:在事件通知到达消息服务之前,存在事件通知生产者失败的可能。具体的消息丢失概率取决于事件生产者的持久化,failover能力以及重试机制。
2) 消息服务失败:消息服务失败包括两种情况:消息到达消息服务前失败和消息到达之后失败。如果消息到达之前失败,那么消息服务提供的message client会重试3次,每次间隔5毫秒。如果事件成功地发送到消息服务,消息会首先被持久化。在消息服务中的一条消息只有满足下列两种情况才会被删除:a. 消息发送成功;b. 消息发送失败且超过重试次数(目前重试3600次,每次间隔60秒)。可以看到,事件(消息)丢失最大的风险在于发送到达消息服务之前的一段时间。
3) 事件接收者失败:如果接收者失败,且在获得事件通知之后,处理事件通知之前,消息服务不提供接口使接收者重获这条消息。当然对于这个问题,还有另一种解决方法,就是使用类似kafka的消息服务模型作为中转者,可以保证事件通知更强大的可靠性。kafka模型[4]不会主动推送消息,仅仅对消息做持久化以实现高吞吐和高可靠。消息订阅者需要给定消息id的范围从某个topic的partition拉消息。当订阅者失败,希望重新获得历史的消息时,只要给定消息id的范围,如果这个范围内的消息没有过期,就可以被重新获得。但是kafka模型使用拉消息的模式不具备完整的事件派发器功能,也就不能支持现在odps需要的异步事件通知编程方式。而事实上,odps消息服务设计的出发点,就是odps事件通知机制(在没有消息服务之前,odps worker履行着消息服务的职责)。
4)消息服务热升级:虽然说是热升级,但是新老服务之间切换也是需要时间的。在线上这个时间最夸张的一次达到了4个多小时(所有topic全部切换完成的时间间隔)。而在切换的过程中,新老消息服务中处于切换中间状态的 topic 是拒绝服务的(切换完成的 topic 可以服务)。
总之,odps事件通知机制提供了一定程度的高可用保证,但是还没有把丢消息的概率降低为0,其最大风险在于消息服务不服务。而此时,消息服务上某个 topic 丢失消息的数量和该 topic 不服务的时间成正比。

总结

odps事件机制给用户监听资源的变化带来了很大的便利。它借鉴了通用的事件异步编程模型,提供了友好的用户接口,支持了线上数据地图,跨集群复制等众多服务。但是,依然有不足之处,例如:
1) 事件监听(订阅/注册)的粒度粗且不可定制:我们曾经接到用户的需求,想监听一个表的 CREATETABLE 事件,但是现有的机制只支持监听到表的级别,这样用户就不得不自己过滤这个表的各种事件。
2) 事件机制的可靠性需要进一步提高:曾经出现过热升级切换消息服务4个小时的情况,原因是其中一个 topic 向某个 endpoint 发送消息卡在了发送那里一直无法退出,造成该 topic 上丢失大量消息。
3) 消息服务的 生产者qps(生产环境接收消息1000-2000),消费者 qps(消费极限qps未测过,因其取决于) 与 开源消息服务如 kafka 生产者 qps (50,000),消费者 qps (22,000) 依然有一定差距[4]。

对于当前希望使用odps消息服务的用户,最好确保满足以下条件:
1) 允许丢失少量的消息通知,因为的确存在小概率丢消息的可能;
2) 事件处理系统具有一定的事件处理能力,接受事件qps最好可以达到500以上。
3) 不用的事件(回调uri发不通且不再使用)请删除,否则会在消息服务中留下永久性垃圾,造成消息在pangu中大量堆积,因为消息服务无法判断用户的事件是否需要删除!!!

解决上述的问题目前依然有一些挑战,但是我们会不断改进和完善事件机制的各项功能,减小事件丢失率,细化事件订阅粒度,优化用户体验。

参考资料

[1] Event restful api

[2] Libevent: http://libevent.org
[3] Odps Message Service
[4] Kreps J, Narkhede N, Rao J. Kafka: A distributed messaging system for log processing[C]//Proceedings of the NetDB. 2011: 1-7.

附录

odps事件机制事件类型列表

在触发 DDL 时,Event 会向预先注册的 url 发送 POST 请求,消息体格式如下

<?xml version="1.0" encoding="UTF-8"?>
<Notification>
  <Account>ALIYUN$odpstest1@aliyun.com</Account>
  <Project>a_2_test_event</Project>
  <SourceType>Table</SourceType>
  <SourceName>backup_partition</SourceName>
  <Reason>ADDPARTITION</Reason>
  <TimeStamp>Mon, 19 Sep 2016 12:45:42 GMT</TimeStamp>
  <Properties>
    <Property>
      <Name>Name</Name>
      <Value>ds=ds1/pt=pt1</Value>
    </Property>
  </Properties>
  <OdpsMessagerId>4</OdpsMessagerId>
  <OdpsMessagerTime>1474289142</OdpsMessagerTime>
</Notification>

其中:

Reason 可能取值 事件生产者
CREATETABLE hiveserver
DROPTABLE ddltask
ALTERTABLE ddltask
ADDPARTITION ddltask
DROPPARTITION ddltask
ALTERPARTITION ddltask
INSERTOVERWRITETABLE ddltask
INSERTINTOTABLE ddltask
INSERTOVERWRITEPARTITION ddltask
INSERTINTOPARTITION ddltask
MERGETABLE ddltask
MERGEPARTITION ddltask
ALTERVOLUMEPARTITION ddltask
ADDVOLUMEPARTITION ddltask
  • SourceType 可能取值:Table

使用限制

1) 目前只有 Project Owner 可以创建 event,无法授权给其他人创建 event
2) 接收 post 信息的 url 应返回 http code 200,server 端 post 时并不支持如 302 这样的跳转。

欢迎加入MaxCompute钉钉群讨论

时间: 2024-09-13 19:54:20

MaxCompute(原ODPS) 事件(Event)机制的相关文章

MaxCompute( 原ODPS)下的表分区解释

大数据计算服务(MaxCompute,原名 ODPS,https://www.aliyun.com/product/odps)是一种快速.完全托管的 GB/TB/PB 级数据仓库解决方案.MaxCompute 向用户提供了完善的数据导入方案以及多种经典的分布式计算模型,能够更快速的解决用户海量数据计算问题,有效降低企业成本,并保障数据安全. ODPS分区字段一般来说都是重复性非常强的字段,比如说时间,某一天可能会产生几万条数据,把这一天产生的数据就存入到一个分区中,而时间(某天)就是分区字段,时

5分钟学会使用DataHub接入实时数据到MaxCompute(原ODPS)

免费开通大数据服务:https://www.aliyun.com/product/odps DataHub服务是MaxCompute提供的流数据服务, 并提供把实时数据准实时归档到MaxCompute中功能, 在延时上可以做到5分钟数据在MaxCompute中可见:相对于MaxCompute之前提供的批量数据接口Tunnel实时性有了极大的提高.本文简要介绍如何快速通过DataHub创建实时数据写入MaxCompute的数据通道. 准备MaxCompute表 假设我们准备的MaxCompute表

【大数据干货】数据进入阿里云数加-大数据计算服务MaxCompute(原ODPS)的N种方式

免费开通大数据服务:https://www.aliyun.com/product/odps 想用阿里云大数据计算服务(MaxCompute),对于大多数人首先碰到的问题就是数据如何迁移到MaxCompute中.按照数据迁移场景,大致可以分为批量数据.实时数据.本地文件.日志文件等的迁移,下面我们针对每种场景分别介绍几种常用方案. 大数据计算服务(MaxCompute) 快速.完全托管的TB/PB级数据仓库解决方案,向用户提供了完善的数据导入方案以及多种经典的分布式计算模型,能够更快速的解决用户海

MaxCompute(原ODPS)开发入门指南——数据开发工具篇

MaxCompute(原ODPS)开发入门指南--数据开发工具篇 写在最前面 >>>进入了解更多>>>阿里云数加·MaxCompute大数据计算服务. 大家在使用大数据计算服务MaxCompute时,最头疼就是我现在已有的数据如何快速上云?我的日志数据如何采集到MaxCompute上?等等...具体详见<MaxCompute(原ODPS)开发入门指南--数据上云篇>. 但是数据在MaxCompute上了之后,问题又来了,我怎么基于上面进行快速的数据开发,构建

MaxCompute(原ODPS)开发入门指南——数据上云篇

MaxCompute(原ODPS)开发入门指南--数据上云篇 写在最前面 >>>进入了解更多>>>阿里云数加·MaxCompute大数据计算服务. 根据<MaxCompute(原ODPS)开发入门指南--计量计费篇>的了解,大家清楚了MaxCompute可以做什么,计费模式如何,想必大家也开通了MaxCompute想进行一次POC,但是大家遇到第一个问题一定是我的数据如何上云? 可通过多种方式数据流入MaxCompute MaxCompute(原ODPS)提

从MapReduce的执行来看如何优化MaxCompute(原ODPS) SQL

SQL基础有这些操作(按照执行顺序来排列): from join(left join, right join, inner join, outer join ,semi join) where group by select sum distinct count order by 如果我们能理解mapreduce是怎么实现这些SQL中的基本操作的,那么我们将很容易理解怎么优化SQL写法.接下来我们一个一个的谈: from 这个操作是在解析过程中就完成了,目的就是找出输入的表(文件). join(

阿里云大数据计算服务MaxCompute(原ODPS)华南1(深圳)Region即将开服!

2017年9月7日,阿里云数加·MaxCompute(原ODPS)华南1(深圳)数据中心正式开服售卖,这是数加·MaxCompute在国内开服的第二个区域.届时MaxCompute将会针对新服开展促销活动,具体活动规则敬请期待! 关于售价 华南1区域价格与华东2一致,主要收费分3部分:存储.计算.下载,其中计算(指SQL和MR计算任务)分预付费.按量后付费两种模式,存储和下载都是按量后付费.做预算的具体的售价信息请看官网定价页或<计量计费文档>. 关于开通 确保云账号是实名认证的账号,在开通购

MaxCompute(原ODPS)开发入门指南——计量计费篇

MaxCompute(原ODPS)开发入门指南 写在最前面 >>>进入了解更多>>>阿里云数加·MaxCompute大数据计算服务. 近期介绍大量数据上云用户关于MaxCompute的一些问题,现就MaxCompute产品线的一些工具栈可以和大家进行交流,也欢迎大家拍砖和来扰,一起学习一起进步!也希望能够在帮助到大家! 系列文章会涉及到的内容 0.MaxCompute概述:是什么?可以做什么?收费模式? 1.数据上云工具介绍:Log.Logstash.Flume.Flu

山寨版AS3事件冒泡机制的实现

AS3实现了事件传递,分成三段Capture,target和bubble,其中bubble就是向树根传递.这个机制非常之经典和好用,可惜它只存在在基于DisplayObject的对象树中,其它时候只能把事件老老实实的从一个对象直接传递到另一个对象. 这次我来玩个有趣的.仿造AS3内置的机制来制作一个更加通用的冒泡框架,相信可以用很多用处(至少已经用在我的项目上了).自己实现事件传递,让它支持任何事件任何数据结构(无论队列,还是树),甚至可以传递任何对象(而不仅限于事件对象).其实bubble的过