ZooKeeper客户端事件串行化处理

为了提升系统的性能,进一步提高系统的吞吐能力,最近公司很多系统都在进行异步化改造。在异步化改造的过程中,肯定会比以前碰到更多的多线程问题,上周就碰到ZooKeeper客户端异步化过程中的一个死锁问题,这里说明下。

通常ZooKeeper对于同一个API,提供了同步和异步两种调用方式。
同步接口很容易理解,使用方法如下:

ZooKeeper zk = new ZooKeeper(...);
List children = zk.getChildren( path, true );

异步接口就相对复杂一点,使用方法如下:

ZooKeeper zk = new ZooKeeper(...);
zk.getChildren( path, true, new AsyncCallback.Children2Callback() {
			@Override
			public void processResult( int rc, String path, Object ctx, List children, Stat stat ) {
				System.out.println( "Recive the response." );
			}
}, null);

 

我们可以看到,异步调用中,需要注册一个Children2Callback,并实现回调方法:processResult。

上周碰到这样的问题:应用注册了对某znode子节点列表变化的监听,逻辑是在接受到ZooKeeper服务器节点列表变更通知(EventType.NodeChildrenChanged)的时候,会重新获取一次子节点列表。之前,他们是使用同步接口,整个应用可以正常运行,但是这次异步化改造后,出现了诡异现象,能够收到子节点的变更通知,但是无法重新获取子节点列表了。

下面,我首先把应用之前使用同步接口的逻辑代码,用一个简单的demo来演示下,如下:

package book.chapter05;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.KeeperState;

/**
 * ZooKeeper API 获取子节点列表,使用同步(sync)接口。
 * @author <a href="mailto:nileader@gmail.com">银时</a>
 */
public class ZooKeeper_GetChildren_API_Sync_Usage implements Watcher {

	private CountDownLatch connectedSemaphore = new CountDownLatch( 1 );
	private static CountDownLatch _semaphore = new CountDownLatch( 1 );
	private ZooKeeper zk;

	ZooKeeper createSession( String connectString, int sessionTimeout, Watcher watcher ) throws IOException {
		ZooKeeper zookeeper = new ZooKeeper( connectString, sessionTimeout, watcher );
		try {
			connectedSemaphore.await();
		} catch ( InterruptedException e ) {
		}
		return zookeeper;
	}

	/** create path by sync */
	void createPath_sync( String path, String data, CreateMode createMode ) throws IOException, KeeperException, InterruptedException {

		if ( zk == null ) {
			zk = this.createSession( "domain1.book.zookeeper:2181", 5000, this );
		}
		zk.create( path, data.getBytes(), Ids.OPEN_ACL_UNSAFE, createMode );
	}

	/** Get children znodes of path and set watches */
	List getChildren( String path ) throws KeeperException, InterruptedException, IOException{

		System.out.println( "===Start to get children znodes.===" );
		if ( zk == null ) {
			zk = this.createSession( "domain1.book.zookeeper:2181", 5000, this );
		}
		return zk.getChildren( path, true );
	}

	public static void main( String[] args ) throws IOException, InterruptedException {

		ZooKeeper_GetChildren_API_Sync_Usage sample = new ZooKeeper_GetChildren_API_Sync_Usage();
		String path = "/get_children_test";

		try {

			sample.createPath_sync( path, "", CreateMode.PERSISTENT );
			sample.createPath_sync( path + "/c1", "", CreateMode.PERSISTENT );
			List childrenList = sample.getChildren( path );
			System.out.println( childrenList );
			//Add a new child znode to test watches event notify.
			sample.createPath_sync( path + "/c2", "", CreateMode.PERSISTENT );
			_semaphore.await();
		} catch ( KeeperException e ) {
			System.err.println( "error: " + e.getMessage() );
			e.printStackTrace();
		}
	}

	/**
	 * Process when receive watched event
	 */
	@Override
	public void process( WatchedEvent event ) {
		System.out.println( "Receive watched event:" + event );
		if ( KeeperState.SyncConnected == event.getState() ) {

			if( EventType.None == event.getType() &amp;&amp; null == event.getPath() ){
				connectedSemaphore.countDown();
			}else if( event.getType() == EventType.NodeChildrenChanged ){
				//children list changed
				try {
					System.out.println( this.getChildren( event.getPath() ) );
					_semaphore.countDown();
				} catch ( Exception e ) {}
			}

		}
	}
}

输出结果如下:

Receive watched event:WatchedEvent state:SyncConnected type:None path:null
===Start to get children znodes.===
[c1]
Receive watched event:WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/get_children_test
===Start to get children znodes.===
[c1, c2]

在上面这个程序中,我们首先创建了一个父节点: /get_children_test,以及一个子节点:/get_children_test/c1。然后调用getChildren的同步接口来获取/get_children_test节点下的所有子节点,调用的同时注册一个watches。之后,我们继续向/get_children_test节点创建子节点:/get_children_test/c2,这个时候,因为我们之前我们注册了一个watches,因此,一旦此时有子节点被创建,ZooKeeper Server就会向客户端发出“子节点变更”的通知,于是,客户端可以再次调用getChildren方法来获取新的子节点列表。

这个例子当然是能够正常运行的。现在,我们进行异步化改造,如下:

package book.chapter05;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.KeeperState;

/**
 * ZooKeeper API 获取子节点列表,使用异步(ASync)接口。
 * @author <a href="mailto:nileader@gmail.com">银时</a>
 */
public class ZooKeeper_GetChildren_API_ASync_Usage_Deadlock implements Watcher {

	private CountDownLatch connectedSemaphore = new CountDownLatch( 1 );
	private static CountDownLatch _semaphore = new CountDownLatch( 1 );
	private ZooKeeper zk;

	ZooKeeper createSession( String connectString, int sessionTimeout, Watcher watcher ) throws IOException {
		ZooKeeper zookeeper = new ZooKeeper( connectString, sessionTimeout, watcher );
		try {
			connectedSemaphore.await();
		} catch ( InterruptedException e ) {
		}
		return zookeeper;
	}

	/** create path by sync */
	void createPath_sync( String path, String data, CreateMode createMode ) throws IOException, KeeperException, InterruptedException {

		if ( zk == null ) {
			zk = this.createSession( "domain1.book.zookeeper:2181", 5000, this );
		}
		zk.create( path, data.getBytes(), Ids.OPEN_ACL_UNSAFE, createMode );
	}

	/** Get children znodes of path and set watches */
	void getChildren( String path ) throws KeeperException, InterruptedException, IOException{

		System.out.println( "===Start to get children znodes.===" );
		if ( zk == null ) {
			zk = this.createSession( "domain1.book.zookeeper:2181", 5000, this );
		}

		final CountDownLatch _semaphore_get_children = new CountDownLatch( 1 );

		zk.getChildren( path, true, new AsyncCallback.Children2Callback() {
			@Override
			public void processResult( int rc, String path, Object ctx, List children, Stat stat ) {

				System.out.println( "Get Children znode result: [response code: " + rc + ", param path: " + path + ", ctx: " + ctx + ", children list: "
						+ children + ", stat: " + stat );
				_semaphore_get_children.countDown();
			}
		}, null);
		_semaphore_get_children.await();
	}

	public static void main( String[] args ) throws IOException, InterruptedException {

		ZooKeeper_GetChildren_API_ASync_Usage_Deadlock sample = new ZooKeeper_GetChildren_API_ASync_Usage_Deadlock();
		String path = "/get_children_test";

		try {
			sample.createPath_sync( path, "", CreateMode.PERSISTENT );
			sample.createPath_sync( path + "/c1", "", CreateMode.PERSISTENT );
			//Get children and register watches.
			sample.getChildren( path );
			//Add a new child znode to test watches event notify.
			sample.createPath_sync( path + "/c2", "", CreateMode.PERSISTENT );

			_semaphore.await();
		} catch ( KeeperException e ) {
			System.err.println( "error: " + e.getMessage() );
			e.printStackTrace();
		}
	}

	/**
	 * Process when receive watched event
	 */
	@Override
	public void process( WatchedEvent event ) {
		System.out.println( "Receive watched event:" + event );
		if ( KeeperState.SyncConnected == event.getState() ) {

			if( EventType.None == event.getType() &amp;&amp; null == event.getPath() ){
				connectedSemaphore.countDown();
			}else if( event.getType() == EventType.NodeChildrenChanged ){
				//children list changed
				try {
					this.getChildren( event.getPath() );
					_semaphore.countDown();
				} catch ( Exception e ) {
					e.printStackTrace();
				}
			}

		}
	}
}

输出结果如下:

Receive watched event:WatchedEvent state:SyncConnected type:None path:null
===Start to get children znodes.===
Get Children znode result: [response code: 0, param path: /get_children_test, ctx: null, children list: [c1], stat: 555,555,1373931727380,1373931727380,0,1,0,0,0,1,556

Receive watched event:WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/get_children_test
===Start to get children znodes.===

在上面这个demo中,执行逻辑和之前的同步版本基本一致,唯一有区别的地方在于获取子节点列表的过程异步化了。这样一改造,问题就出来了,整个程序在进行第二次获取节点列表的时候,卡住了。和应用方确认了,之前同步版本从来没有出现过这个现象的,所以开始排查这个异步化中哪里会阻塞。

这里,我们重点讲解在ZooKeeper客户端中,需要处理来自服务端的两类事件通知:一类是Watches时间通知,另一类则是异步接口调用的响应。值得一提的是,在ZooKeeper的客户端线程模型中,这两个事件由同一个线程处理,并且是串行处理。具体可以自己查看事件处理的核心类:org.apache.zookeeper.ClientCnxn.EventThread。

时间: 2024-08-18 16:20:53

ZooKeeper客户端事件串行化处理的相关文章

XML串行化数据基础

xml|数据 XML文档使用越来越普遍,我们常会把一些系统设置等文件用config或XML文档存文在程序目录下..Net的串行化数据是一个很令人心动的技术,可以很方便地读取一定格式或保存成文件.下面做个简单了练习,对XML串行化作些初步了解.(注:我是在WINDOWS应用程序上做的练习,因怕建立ASP.NETA工程麻烦,下面一大堆东西.其实除了路径外,程序写法基本一致). 先编写一个需要串行化的类,该类是设置一个操作权限的设置文件 using System; namespace Hellosea

XML串行化数据的基础

xml|数据 XML文档使用越来越普遍,我们常会把一些系统设置等文件用config或XML文档存文在程序目录下..Net的串行化数据是一个很令人心动的技术,可以很方便地读取一定格式或保存成文件.下面做个简单了练习,对XML串行化作些初步了解.(注:我是在WINDOWS应用程序上做的练习,因怕建立ASP.NETA工程麻烦,下面一大堆东西.其实除了路径外,程序写法基本一致). 先编写一个需要串行化的类,该类是设置一个操作权限的设置文件 using System; namespace Hellosea

.NET使XML串行化易如反掌

xml   人们一直高喊XML是解决系统互联问题的关键, 而.NET framework 也为处理XML数据提供了许多不同的类库. XmlDocument 类能让你像处理文件一样处理XML 数据, 而XmlReader, XmlWriter, 和它们的派生类使你能够将XML 数据做为数据流处理. XmlSerializer 则提供了另外的方法, 它使你能够将自己的对象串行和反串行化为XML. 串行化数据既能够让你像处理文件一样对数据进行随机存取, 同时又能够跳过你不感兴趣的元素. 在本文中, 我

PHP5.0对象模型探索之对象串行化

php5|对象     串行化可以把变量包括对象,转化成连续bytes数据,你可以将串行化后的变量存在一个文件里或在网络上传输,然后再反串行化还原为原来的数据.你在反串行化类的对象之前定义的类,PHP可以成功地存储其对象的属性和方法. 有时你可能需要一个对象在反串行化后立即执行.为了这样的目的,PHP会自动寻找__sleep和__wakeup方法. 当一个对象被串行化,PHP会调用__sleep方法(如果存在的话). 在反串行化一个对象后,PHP 会调用__wakeup方法. 这两个方法都不接受

PHP中的串行化变量和序列化对象

变量|对象     串行化大概就是把一些变量转化成为字符串的字节流的形式,这样比较容易传输.存储.当然,关是传输存储没有什么,关键是变成串的形式以后还能够转化回来,而且能够保持原来数据的结构. 在PHP中有多串行化处理的函数:serialize(),该函数把任何变量值(除了资源变量)转化为字符串的形式,可以把字符串保存到文件里,或者注册为Session,乃至于使用curl来模拟GET/POST来传输变量,达到RPC的效果. 如果要将串行化的变量转化成PHP原始的变量值,那么可以使用unseria

第十三节--对象串行化 -- Classes and Objects in PHP5 [13]

object|php5|对象 +-------------------------------------------------------------------------------+| = 本文为Haohappy读<<Core PHP Programming>> | = 中Classes and Objects一章的笔记 | = 翻译为主+个人心得 | = 为避免可能发生的不必要的麻烦请勿转载,谢谢 | = 欢迎批评指正,希望和所有PHP爱好者共同进步! +-------

使用MFC串行化数据和C++对象

串行化数据 --例子程序:Memo 创建一个新的单文档 SDI 应用,视图类选择 CFormView,以便用户可以在窗口中输入. 在界面中创建三个编辑框,然后再添加三个相应的编辑框变量.这三个变量是视图类的成员变量,为了交互数据,文档类中也要创建三个对应的变量.然后,文档类和视图类都要对数据成员进行初始化操作,在文档类中这个工作通常都在 OnNewDocument() 函数中进行.因为下面任何一个操作发生时都触发文档类 OnNewDocument()函数执行: 当用户启动应用程序: 当用户在"F

实例解析C++/CLI的串行化

串行化可使对象被转换为某种外部的形式,比如以文件存储的形式供程序使用,或通过程序间的通讯发送到另一个处理过程.转换为外部形式的过程称为"串行化",而逆过程称为"反串行化". 简介 请看例1中的示例,其将多个对象类型的值写入到一个新的磁盘文件中,关闭文件,接着再把这些值重新读取到内存中. 例1: using namespace System; using namespace System::IO; using namespace System::Runtime::Se

怎样给串行化类分配版本号(可配置版本模式)

编写可串行化类时,MFC用你指定的模式号制定一个粗略的版本控制方式.在向档案写数据时, MFC用模式标记该类的实例:而在读回数据时,MFC将档案中的记录的模式号和应用程序中使用着的该类对象的模式号做比较,如果两模式号不匹配,则MFC发送一个CArchiveException,其m_cause 等于CArchiveException::badSchema.没有得到处理的该类异常会促使MFC显示一个对话框,提示 "非预期的文件格式".如果每次修改对象的串行化存储格式时都能做到增加模式号,那