zookeeper watcher使用注意点

背景

 项目中使用了zookeeper进行的类似工作流引擎的工作流转,将一次工作请求拆分了4个节点(S/E/T/L)。S阶段做完后,通过zk的watcher触发下一个E节点进行处理,S和E可能为不同的jvm上,所以需要走一个分布式的消息进行通知。

思路

基于zookeeper做持久化watcher,项目中直接使用zookeeper官方api,大致的工作模型:

 

1.private synchronized void initNodes(List<String> nodes) {
2.    // 根据zk节点,判断是否需要处理
3.}
4.
5.private void syncNodes() {
6.        try {
7.            List<String> nodes = zookeeper.getChildren(ArbitrateConstants.NODE_NID_ROOT, new AsyncWatcher() {
8.
9.                public void asyncProcess(WatchedEvent event) {
10.                    syncNodes();// 继续关注node节点变化
11.                }
12.            });
13.
14.            initNodes(nodes);
15.        } catch (KeeperException e) {
16.            syncNodes();
17.        } catch (InterruptedException e) {
18.            // ignore
19.        }
20.    }

  • 有两个方法initNodes 和 syncNodes,  syncNodes主要是监听zookeeper的节点变化
  • syncNodes会通过级联方式,在每次watcher被触发后,就会再挂一次watcher。完成了一个类似链式触发的功能

遇到的问题

系统上线运行后,跑了几天时间,跑出了一个OutOfMemory的问题,jmap dump了下对应的内存数据文件,发现了一个zk使用上的问题.

 

 

 

a. 通过mat分析查看了下jvm中占用内存最大的对象,居然是zookeeper中的一个waitingEvents.: 

b. waitingEvents中的WatcherSetEventPair对象中,包含了一个待响应的watchers和对应的响应event事件对象,对应的watchers数量居然有300W个

问题分析:

分析了下WatcherSetEventPair中的处理机制。 

 

Event响应中对应EventType的枚举类型:(存在一个特殊的None类型)

 

 

  • None (-1),
  • NodeCreated (1),
  • NodeDeleted (2),
  • NodeDataChanged (3),
  • NodeChildrenChanged (4);

查了下代码,None类型会在Session expired / connection loss/  auth failed得到对应的触发,对应的触发path为null

代码:

1.eventThread.queueEvent(new WatchedEvent(
2.                        Watcher.Event.EventType.None,
3.                        Watcher.Event.KeeperState.Expired, null));

针对None类型,在获取对应的watcher响应时:

1.public Set<Watcher> materialize(Watcher.Event.KeeperState state,
2.                                        Watcher.Event.EventType type,
3.                                        String clientPath)
4.        {
5.            Set<Watcher> result = new HashSet<Watcher>();
6.
7.            switch (type) {
8.            case None:
9.                result.add(defaultWatcher);
10.                for(Set<Watcher> ws: dataWatches.values()) {
11.                    result.addAll(ws);
12.                }
13.                for(Set<Watcher> ws: existWatches.values()) {
14.                    result.addAll(ws);
15.                }
16.                for(Set<Watcher> ws: childWatches.values()) {
17.                    result.addAll(ws);
18.                }
19.
20.                // clear the watches if auto watch reset is not enabled
21.                if (ClientCnxn.getDisableAutoResetWatch() &&
22.                        state != Watcher.Event.KeeperState.SyncConnected)
23.                {
24.                    synchronized(dataWatches) {
25.                        dataWatches.clear();
26.                    }
27.                    synchronized(existWatches) {
28.                        existWatches.clear();
29.                    }
30.                    synchronized(childWatches) {
31.                        childWatches.clear();
32.                    }
33.                }
34.
35.                return result;

针对出现None的类型,会将所有的watcher进行触发,同时并不会移除watcher,所以,watcher会在下一次reconnect成功后再次触发,除非设置DisableAutoResetWatch

总结

a.  需要明确watcher的触发条件和触发case场景。特别注意,None类型可能会引起触发2次watcher调用

 

(截取了淘宝同学的blog : http://rdc.taobao.com/team/jm/archives/1047)

 

event For “/path”defaultWatcherexists
(“/path”)
getData
(“/path”)
getChildren
(“/path”)

EventType.None
EventType.NodeCreated    
EventType.NodeDeleted    
EventType.NodeDataChanged    
EventType.NodeChildrenChanged      

b.  出现session expired,需要重建zookeeper connector,对应的watcher会失效。因为watcher在client的存储是和对应的zookeeper client绑定,不同的client有不同的watcher列表。

时间: 2024-10-07 08:53:47

zookeeper watcher使用注意点的相关文章

ZooKeeper Watcher 和 AsyncCallback 的区别与实现

前言 初学 Zookeeper 会发现客户端有两种回调方式: Watcher 和 AsyncCallback,而 Zookeeper 的使用是离不开这两种方式的,搞清楚它们之间的区别与实现显得尤为重要.本文将围绕下面几个方面展开 Watcher 和 AsyncCallback 的区别 Watcher 的回调实现 AsyncCallback 的回调实现 IO 与事件处理 Watcher 和 AsyncCallback 的区别 我们先通过一个例子来感受一下: zooKeeper.getData(ro

ZooKeeper Watcher注意事项

zookeeper watch的定义如下:watch事件是一次性触发器,当watch监视的数据发生变化时,通知设置了该watch的client,即watcher. 需要注意三点: 1.一次性触发器 client在一个节点上设置watch,随后节点内容改变,client将获取事件.当节点内容再次改变,client不会获取这个事件,除非它又执行了一次读操作并设置watch 2.发送至client,watch事件延迟 watch事件异步发送至观察者.比如说client执行一次写操作,节点数据内容发生变

Zookeeper开源客户端框架Curator简介与示例

简介         Curator最初由Netflix的Jordan Zimmerman开发, Curator提供了一套Java类库, 可以更容易的使用ZooKeeper.         所谓ZooKeeper技巧(ZooKeeper Recipes),也可以称之为解决方案, 或者叫实现方案, 是指ZooKeeper的使用方法, 比如分布式的配置管理, Leader选举等         Curator作为Apache ZooKeeper天生配套的组件.ZooKeeper的Java开发者自然

跟着实例学习ZooKeeper的用法: Curator框架应用

前面的几篇文章介绍了一些ZooKeeper的应用方法, 本文将介绍Curator访问ZooKeeper的一些基本方法, 而不仅仅限于指定的Recipes, 你可以使用Curator API任意的访问ZooKeeper. CuratorFramework Curator框架提供了一套高级的API, 简化了ZooKeeper的操作. 它增加了很多使用ZooKeeper开发的特性,可以处理ZooKeeper集群复杂的连接管理和重试机制. 这些特性包括: 自动化的连接管理: 重新建立到ZooKeeper

关于zooKeeper,下面这个例子能保证数据安全吗

问题描述 package zooKeeper.zooKeeperLock; import java.io.IOException;import java.util.ArrayList;import java.util.Collections;import java.util.List;import java.util.concurrent.CountDownLatch;import java.util.concurrent.TimeUnit;import java.util.concurrent

基于ZooKeeper的分布式锁和队列

在分布式系统中,往往需要一些分布式同步原语来做一些协同工作,上一篇文章介绍了Zookeeper的基本原理,本文介绍下基于Zookeeper的Lock和Queue的实现,主要代码都来自Zookeeper的官方recipe. 锁(Lock) 完全分布式锁是全局同步的,这意味着在任何时刻没有两个客户端会同时认为它们都拥有相同的锁,使用 Zookeeper 可以实现分布式锁,需要首先定义一个锁节点(lock root node). 需要获得锁的客户端按照以下步骤来获取锁: 保证锁节点(lock root

ZooKeeper客户端事件串行化处理

为了提升系统的性能,进一步提高系统的吞吐能力,最近公司很多系统都在进行异步化改造.在异步化改造的过程中,肯定会比以前碰到更多的多线程问题,上周就碰到ZooKeeper客户端异步化过程中的一个死锁问题,这里说明下. 通常ZooKeeper对于同一个API,提供了同步和异步两种调用方式. 同步接口很容易理解,使用方法如下: ZooKeeper zk = new ZooKeeper(...); List children = zk.getChildren( path, true ); 异步接口就相对复

zookeeper java.lang.NoSuchMethodError异常

问题描述 zookeeper java.lang.NoSuchMethodError异常 没找到问题所在 就是在myeclipse有出现 但在IDEA上就没有这个错误 错误如下: java.lang.NoSuchMethodError: org.apache.zookeeper.ZooKeeper.(Ljava/lang/String;ILorg/apache/zookeeper/Watcher;Z)V at org.apache.curator.utils.DefaultZookeeperFa

基于ZooKeeper的一种简单分布式锁的实现

        ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务.基于ZooKeeper,我们可以实现一种简单的分布式互斥锁,包括可重入与不可重入.代码如下: import java.io.IOException; import java.util.ArrayList; import java.util.Random; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperExcept