免费开通大数据服务:https://www.aliyun.com/product/odps
转自habai
什么是 ODPS 事件机制
Odps event 用于监控表和实例等odps资源(目前只用于监控表)。当表状态发生变化时,odps 会向预先注册(订阅)的 uri 发送信息。Event通知只有在订阅Event之后才能收到。每个project中可以订阅一个或多个Event。Event是用户的数据,同表数据一样,创建或修改时都需要有这个Project的操作权限。关于Event的Restful Api,在文章[1]里有介绍。
为什么需要 ODPS 事件机制
考虑以下场景:当一个用户 A 关心某一个表 T 的操作(创建/删除/插入/修改/...)时,如果表 T 不是用户 A 创建的,那么用户 A 可以采用什么方法感知这个操作?一个方法是主动轮询这个表是否做了某个操作,但是缺点是不言而喻的。另一个方法是,注册一个回调,当表被操作时,被动接受通知。用这种方法可以使用户逻辑不必轮询和等待对表的操作。ODPS Event机制就是第二种方法的实现。
在实际的生产中,对以上应用场景有大量的需求,并已经形成了对Odps Event丰富的应用,例如:
- 数据地图: 订阅了一些 project 的 Event,并根据 Event 通知展示这些 project 中表的元数据。
- 跨集群复制: 监听 Event 通知以复制相应的表。
- 蚂蚁金服: 依赖事件通知机制进行工作流管理,统计,授权等工作。 事实上,每个 project 都有大量用户订阅了所属project的表以及其它project表的事件通知。
ODPS 事件机制是怎样实现的
本节首先将 ODPS 事件机制 作为一个黑盒,从用户的角度介绍其功能和使用方法。而后以此为切入点,深入剖析 ODPS 事件机制的内部机理。最后,提出一些对当前事件机制的思考。
订阅(注册)一个事件 & 事件通知
在网络编程中,为了减轻多线程的压力,往往使用事件通知驱动的异步编程。如,libevent[2]。使用这个库编写一个服务器程序,可以这样做:
void on_accept(int sock, short event, void* arg);
int main(int argc, char* argv[])
{
// create socket s
struct sockadddr_in addrin;
int s = socket(AF_INET, SOCK_STREAM, 0);
BOOL bReuseaddr=TRUE;
setsockopt(s, SOL_SOCKET ,SO_REUSEADDR, (const char*)&bReuseaddr, sizeof(BOOL));
memset(&addrin, 0, sizeof(addrin));
addrin.sin_family = AF_INET;
addrin.sin_port = htons(PORT);
addrin.sin_addr.s_addr = INADDR_ANY;
bind(s, (struct sockaddr*)&addrin, sizeof(struct sockaddr));
listen(s, BACKLOG);
// 创建事件池 event base
struct event_base* eb = event_base_new();
// 创建事件 & 绑定回调
struct event e;
event_set(&e, s, EV_READ|EV_PERSIST, on_accept, NULL);
// 注册事件
event_base_set(eb, &e);
event_add(&e, NULL);
// 启动事件派发
event_base_dispatch(eb);
return 0;
}
抽取出上面事件通知逻辑的主线:创建事件池,创建一个 event 并绑定回调函数, 把 event 注册到事件池并启动事件派发器。
在这个过程中,事件生产者是socket(严格说是绑定在这个socket上的事件多路复用接口,如epoll),事件中转者是libevent中的事件池(event base)和事件派发器,事件消费者是事件处理回调函数。
同样的过程适用于odps event。事件池和派发器不需要用户创建。用户首先创建一个事件,然后绑定回调处理逻辑,最后把事件注册到事件池。代码如下:
public class TestOdpsEvent {
/**
* 创建事件方法
*/
static Event buildEvent(String eventName, String tableName, String callbackUri, String comment) throws URISyntaxException {
Event event = new Event();
event.setName(eventName); // 指定事件名称
event.setComment(comment); // 事件注释
event.setType(Event.SourceType.TABLE); // 指定事件类型,目前支持 TABLE
Event.Config config = new Event.Config();
config.setName("source");
config.setValue(tableName); // 指定事件源(即 表名). "*" 表示所有表.
event.setConfig(config);
event.setUri(new URI(callbackUri)); // 指定了一个回调地址
return event;
}
public static void main(String[] args) throws OdpsException, InterruptedException, URISyntaxException {
Account account = new AliyunAccount("xxxxxx", "xxxxxx");
Odps odps = new Odps(account);
String odps_endpoint = "http://xxxx/xxx";
odps.setEndpoint(odps_endpoint);
odps.setDefaultProject("project1");
InternalOdps iodps = new InternalOdps(odps);
// 创建事件 & 绑定回调
String callbackUri = "http://xxx.xxx.xxx.xxx:xxxx/xxxxx"; // this is different from odps_endpoint
Event e = buildEvent("table_create_event_1", "table1", callbackUri, "this is a test event");
// 注册事件
iodps.events().create(e);
// 查看已创建事件
Iterator<Event> events = iodps.events().iterator();
while(events.hasNext()) {
Event event1 = events.next();
System.out.println("Event found: " + event1.getName());
System.out.println("Event uri: " + event1.getUri());
// iodps.events().delete(event1.getName()); // 删除事件
}
}
}
在上面的代码中,指定了一个回调地址。当表发生变化时,就会通知这个回调地址。用户根据在这个回调地址接收到事件通知,使用相应的处理逻辑处理。事件回调地址作为事件处理逻辑入口,支持多种协议,包括但不限于kuafu, http, https等。与libevent不同的是,odps event的生产者,中转者和消费者可以位于不同网络区域。在用户注册事件之后,odps event机制会在该事件发生后立即通知用户注册的回调地址。
剖析 odps 事件机制
图3-1的三个部分分别表示了注册事件,转发通知,删除事件的过程。MessageService是 ODPS 内部消息服务,作用是转发事件通知到用户注册的回调地址。为方便理解,把 Create topic, Create subscription, Add endpoint 看作注册事件在消息服务层的三个操作。事件机制在消息服务层具体的实现将在后边介绍。
在图3-1注册事件的过程中,用户的请求由 OdpsWorker 的 createEventHandler 处理。createEventHandler 依次检查相应的 MessageService topic,subscription,endpoint 是否存在,如果不存在,创建。
在图3-1删除事件的过程相对简单,用户的请求由 OdpsWorker 的 deleteEventHandler 处理。deleteEventHandler 直接删除相应的 MessageService subscription。
在图3-1转发事件通知的过程中,事件的生产者主要是ddl task(事实上,由于历史原因,还有HiveServer,CREATETABLE 事件从这里发出),当执行对表的meta相关的操作时,就会触发ddl task。如drop table, add partition, insert into partition等。ddl task 会发送相应操作的事件通知。事件通知发送给事件中转者——消息服务。消息服务将这个事件通知发送给相应事件绑定的回调地址。
消息服务作为事件中转者,主要完成如下功能:
1)作为事件池维护不同事件和回调地址的对应关系(一个事件对应一个或多个回调地址)
2)作为事件派发器根据事件通知匹配相应事件并将该事件通知转发到对应事件的各个回调地址。
目前不同的事件依据两个属性区分:project名和事件源。事件源目前是表名。在ddl task发出的事件通知中,包含了这两个关键信息。消息服务根据这两个信息匹配相应事件。在介绍消息服务匹配的实现方式时,首先需要了解 ODPS消息服务 的基本概念(为便于理解,本文简化了消息服务的一些概念,如隐藏了partition概念。在文章[3]中,具体介绍了消息服务的设计和实现)。如图3-2:
odps消息服务包含四个基本概念:topic, subscription, filter, endpoint。消息服务使用了典型的发布订阅模型。用户可以创建topic。创建一个或多个subscription(包含一个或多个endpoint)订阅这个topic。消息发布者向topic发送消息。该消息被转发到该topic的所有filter匹配的subscription的所有的endpoint。其中,topic的创建者,subscription的创建者,消息的发送者,以及消息的接收者可以是不同的用户。在创建subscription时,需要指定filter matcher。在消息发送时需要指定filter。当某条消息发送到某个topic时,消息中的filter需要和这个topic的各个subscription的filter matcher匹配,如果匹配成功,将这个消息的一个副本发送给这个subscription的所有endpoint,否则不发送给它们,然后继续匹配其他的subscription。filter和filter_matcher的示例和匹配规则如下:
filter_matcher | filter | is matched |
---|---|---|
"" | "k=v" | yes. If filter_matcher is "", it will match forever. |
"k=v" | "k=v" | yes |
"k=v" | "k=v1" | no |
"k1=v" | "k=v" | no |
"k=v1|v2" | "k=v1" | yes |
"k=v1|v2" | "k=v2" | yes |
"k=v1|v2" | "k=v1|v2" | no. filter's value is 'v1|v2', not 'v1' or 'v2'. |
"k1=v1,k2=v2" | "k1=v1,k2=v2" | yes |
"k1=v1" | "k1=v1,k2=v2" | yes |
"k1=v1,k2=v2" | "k1=v1" | no |
"k=v" | "" | no |
"" | "" | yes. If filter is "", filter_matcher will never hit except its value is "" |
消息服务的这个机制,可以实现上述事件中转者的功能。将一个事件表达为一个subscription,将一个事件通知表达为一条消息,每个endpoint记录一个回调地址。每个project对应一个topic,用filter区分事件源。当一个事件通知产生之后,会被发送到产生通知的project所在的topic上。然后,经过匹配,转发所有的endpoint对应回调地址上。事件通知消息体示例如下:
<?xml version="1.0" encoding="UTF-8"?>
<Notification>
<Account>ALIYUN$odpstest1@aliyun.com</Account>
<Project>a_2_test_event</Project>
<SourceType>Table</SourceType>
<SourceName>backup_partition</SourceName>
<Reason>CREATETABLE</Reason>
<TimeStamp>Sun, 18 Sep 2016 14:21:32 GMT</TimeStamp>
<Properties/>
<OdpsMessagerId>1</OdpsMessagerId>
<OdpsMessagerTime>1474208492</OdpsMessagerTime>
</Notification>
<?xml version="1.0" encoding="UTF-8"?>
<Notification>
<Account>ALIYUN$odpstest1@aliyun.com</Account>
<Project>a_2_test_event</Project>
<SourceType>Table</SourceType>
<SourceName>backup_partition</SourceName>
<Reason>ADDPARTITION</Reason>
<TimeStamp>Mon, 19 Sep 2016 12:45:42 GMT</TimeStamp>
<Properties>
<Property>
<Name>Name</Name>
<Value>ds=ds1/pt=pt1</Value>
</Property>
</Properties>
<OdpsMessagerId>4</OdpsMessagerId>
<OdpsMessagerTime>1474289142</OdpsMessagerTime>
</Notification>
当用户订阅了 Project a_2_test_event 的 Table "backup_partition" 的事件后,当发生对这个表的 CREATETABLE 和 ADDPARTITION 操作后,会接收到上面的两个事件通知。每个事件通知是一个 xml 格式的消息。SourceType 表示订阅的是表的事件通知还是其它类型资源的事件通知(目前只支持表)。SourceName 表示订阅的表的名字。Reason 表示在该表上发生的操作,上例中分别是创建表的操作和增加分区的操作(在 附录 中列举了更多的操作类型)。Properties 中会有一些附加的通知属性,常用来指出操作发生在表的哪个 parition 上。OdpsMessagerId 在一个 Project 的所有表中是唯一的。OdpsMessagerTime 是这条通知产生的时刻。
在odps线上服务环境中,每一个project p1,就会对应一个名为 SQL_p1的topic(因为历史原因hardcode了前缀SQL_,不过前缀是什么无所谓,只要可以区分事件机制的topic和其它应用中的topic就好),这个 topic 在第一次注册事件的时候自动创建(也可以在创建project时手动创建)。p1的所有事件通知都会发到这个topic上。这个topic在其对应的project删除时被删除。
odps消息服务为事件机制提供了对事件通知的持久化,保序,failover的功能,尽最大努力保证消息不丢,但是依然不能保证绝对不丢。下面分析事件机制可能出现的丢消息情况:事件生产者失败,消息服务失败,事件接收者失败,消息服务热升级。
1) 事件生产者失败:在事件通知到达消息服务之前,存在事件通知生产者失败的可能。具体的消息丢失概率取决于事件生产者的持久化,failover能力以及重试机制。
2) 消息服务失败:消息服务失败包括两种情况:消息到达消息服务前失败和消息到达之后失败。如果消息到达之前失败,那么消息服务提供的message client会重试3次,每次间隔5毫秒。如果事件成功地发送到消息服务,消息会首先被持久化。在消息服务中的一条消息只有满足下列两种情况才会被删除:a. 消息发送成功;b. 消息发送失败且超过重试次数(目前重试3600次,每次间隔60秒)。可以看到,事件(消息)丢失最大的风险在于发送到达消息服务之前的一段时间。
3) 事件接收者失败:如果接收者失败,且在获得事件通知之后,处理事件通知之前,消息服务不提供接口使接收者重获这条消息。当然对于这个问题,还有另一种解决方法,就是使用类似kafka的消息服务模型作为中转者,可以保证事件通知更强大的可靠性。kafka模型[4]不会主动推送消息,仅仅对消息做持久化以实现高吞吐和高可靠。消息订阅者需要给定消息id的范围从某个topic的partition拉消息。当订阅者失败,希望重新获得历史的消息时,只要给定消息id的范围,如果这个范围内的消息没有过期,就可以被重新获得。但是kafka模型使用拉消息的模式不具备完整的事件派发器功能,也就不能支持现在odps需要的异步事件通知编程方式。而事实上,odps消息服务设计的出发点,就是odps事件通知机制(在没有消息服务之前,odps worker履行着消息服务的职责)。
4)消息服务热升级:虽然说是热升级,但是新老服务之间切换也是需要时间的。在线上这个时间最夸张的一次达到了4个多小时(所有topic全部切换完成的时间间隔)。而在切换的过程中,新老消息服务中处于切换中间状态的 topic 是拒绝服务的(切换完成的 topic 可以服务)。
总之,odps事件通知机制提供了一定程度的高可用保证,但是还没有把丢消息的概率降低为0,其最大风险在于消息服务不服务。而此时,消息服务上某个 topic 丢失消息的数量和该 topic 不服务的时间成正比。
总结
odps事件机制给用户监听资源的变化带来了很大的便利。它借鉴了通用的事件异步编程模型,提供了友好的用户接口,支持了线上数据地图,跨集群复制等众多服务。但是,依然有不足之处,例如:
1) 事件监听(订阅/注册)的粒度粗且不可定制:我们曾经接到用户的需求,想监听一个表的 CREATETABLE 事件,但是现有的机制只支持监听到表的级别,这样用户就不得不自己过滤这个表的各种事件。
2) 事件机制的可靠性需要进一步提高:曾经出现过热升级切换消息服务4个小时的情况,原因是其中一个 topic 向某个 endpoint 发送消息卡在了发送那里一直无法退出,造成该 topic 上丢失大量消息。
3) 消息服务的 生产者qps(生产环境接收消息1000-2000),消费者 qps(消费极限qps未测过,因其取决于) 与 开源消息服务如 kafka 生产者 qps (50,000),消费者 qps (22,000) 依然有一定差距[4]。
对于当前希望使用odps消息服务的用户,最好确保满足以下条件:
1) 允许丢失少量的消息通知,因为的确存在小概率丢消息的可能;
2) 事件处理系统具有一定的事件处理能力,接受事件qps最好可以达到500以上。
3) 不用的事件(回调uri发不通且不再使用)请删除,否则会在消息服务中留下永久性垃圾,造成消息在pangu中大量堆积,因为消息服务无法判断用户的事件是否需要删除!!!
解决上述的问题目前依然有一些挑战,但是我们会不断改进和完善事件机制的各项功能,减小事件丢失率,细化事件订阅粒度,优化用户体验。
参考资料
[1] Event restful api
[2] Libevent: http://libevent.org
[3] Odps Message Service
[4] Kreps J, Narkhede N, Rao J. Kafka: A distributed messaging system for log processing[C]//Proceedings of the NetDB. 2011: 1-7.
附录
odps事件机制事件类型列表
在触发 DDL 时,Event 会向预先注册的 url 发送 POST 请求,消息体格式如下
<?xml version="1.0" encoding="UTF-8"?>
<Notification>
<Account>ALIYUN$odpstest1@aliyun.com</Account>
<Project>a_2_test_event</Project>
<SourceType>Table</SourceType>
<SourceName>backup_partition</SourceName>
<Reason>ADDPARTITION</Reason>
<TimeStamp>Mon, 19 Sep 2016 12:45:42 GMT</TimeStamp>
<Properties>
<Property>
<Name>Name</Name>
<Value>ds=ds1/pt=pt1</Value>
</Property>
</Properties>
<OdpsMessagerId>4</OdpsMessagerId>
<OdpsMessagerTime>1474289142</OdpsMessagerTime>
</Notification>
其中:
Reason 可能取值 | 事件生产者 |
---|---|
CREATETABLE | hiveserver |
DROPTABLE | ddltask |
ALTERTABLE | ddltask |
ADDPARTITION | ddltask |
DROPPARTITION | ddltask |
ALTERPARTITION | ddltask |
INSERTOVERWRITETABLE | ddltask |
INSERTINTOTABLE | ddltask |
INSERTOVERWRITEPARTITION | ddltask |
INSERTINTOPARTITION | ddltask |
MERGETABLE | ddltask |
MERGEPARTITION | ddltask |
ALTERVOLUMEPARTITION | ddltask |
ADDVOLUMEPARTITION | ddltask |
- SourceType 可能取值:Table
使用限制
1) 目前只有 Project Owner 可以创建 event,无法授权给其他人创建 event
2) 接收 post 信息的 url 应返回 http code 200,server 端 post 时并不支持如 302 这样的跳转。
欢迎加入MaxCompute钉钉群讨论