分布式 ZooKeeper 缓存用法实例教程

可以利用ZooKeeper在集群的各个节点之间缓存数据。 每个节点都可以得到最新的缓存的数据。 Curator提供了三种类型的缓存方式:Path Cache,Node Cache 和Tree Cache。

Path Cache

Path Cache用来监控一个ZNode的子节点. 当一个子节点增加, 更新,删除时, Path Cache会改变它的状态, 会包含最新的子节点, 子节点的数据和状态。 这也正如它的名字表示的那样, 那监控path。

实际使用时会涉及到四个类:

    PathChildrenCache
    PathChildrenCacheEvent
    PathChildrenCacheListener
    ChildData

通过下面的构造函数创建Path Cache:

public PathChildrenCache(CuratorFramework client, String path, boolean cacheData)

想使用cache,必须调用它的start方法,不用之后调用close方法。 start有两个, 其中一个可以传入StartMode,用来为初始的cache设置暖场方式(warm):

    NORMAL: 初始时为空。
    BUILD_INITIAL_CACHE: 在这个方法返回之前调用rebuild()。
    POST_INITIALIZED_EVENT: 当Cache初始化数据后发送一个PathChildrenCacheEvent.Type#INITIALIZED事件

public void addListener(PathChildrenCacheListener listener)可以增加listener监听缓存的改变。

getCurrentData()方法返回一个List<ChildData>对象,可以遍历所有的子节点。

这个例子摘自官方的例子, 实现了一个控制台的方式操作缓存。 它提供了三个命令, 你可以在控制台中输入。

    set 用来新增或者更新一个子节点的值, 也就是更新一个缓存对象
    remove 是删除一个缓存对象
    list 列出所有的缓存对象

另外还提供了一个help命令提供帮助。

设置/更新、移除其实是使用client (CuratorFramework)来操作, 不通过PathChildrenCache操作:

client.setData().forPath(path, bytes);
client.create().creatingParentsIfNeeded().forPath(path, bytes);
client.delete().forPath(path);

而查询缓存使用下面的方法:

for (ChildData data : cache.getCurrentData()) {
    System.out.println(data.getPath() + " = " + new String(data.getData()));
}

Node Cache

Path Cache用来监控一个ZNode. 当节点的数据修改或者删除时,Node Cache能更新它的状态包含最新的改变。

涉及到下面的三个类:

    NodeCache
    NodeCacheListener
    ChildData

想使用cache,依然要调用它的start方法,不用之后调用close方法。

getCurrentData()将得到节点当前的状态,通过它的状态可以得到当前的值。 可以使用public void addListener(NodeCacheListener listener)监控节点状态的改变。

我们依然使用上面的例子框架来演示Node Cache。

package com.colobu.zkrecipe.cache;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.Arrays;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;
import org.apache.curator.utils.CloseableUtils;
import org.apache.zookeeper.KeeperException;

public class NodeCacheExample {
    private static final String PATH = "/example/nodeCache";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        NodeCache cache = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            cache = new NodeCache(client, PATH);
            cache.start();
            processCommands(client, cache);
        } finally {
            CloseableUtils.closeQuietly(cache);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    private static void addListener(final NodeCache cache) {
        // a PathChildrenCacheListener is optional. Here, it's used just to log
        // changes
        NodeCacheListener listener = new NodeCacheListener() {

            @Override
            public void nodeChanged() throws Exception {
                if (cache.getCurrentData() != null)
                    System.out.println("Node changed: " + cache.getCurrentData().getPath() + ", value: " + new String(cache.getCurrentData().getData()));
            }
        };
        cache.getListenable().addListener(listener);
    }

    private static void processCommands(CuratorFramework client, NodeCache cache) throws Exception {
        printHelp();
        try {
            addListener(cache);
            BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
            boolean done = false;
            while (!done) {
                System.out.print("> ");
                String line = in.readLine();
                if (line == null) {
                    break;
                }
                String command = line.trim();
                String[] parts = command.split("s");
                if (parts.length == 0) {
                    continue;
                }
                String operation = parts[0];
                String args[] = Arrays.copyOfRange(parts, 1, parts.length);
                if (operation.equalsIgnoreCase("help") || operation.equalsIgnoreCase("?")) {
                    printHelp();
                } else if (operation.equalsIgnoreCase("q") || operation.equalsIgnoreCase("quit")) {
                    done = true;
                } else if (operation.equals("set")) {
                    setValue(client, command, args);
                } else if (operation.equals("remove")) {
                    remove(client);
                } else if (operation.equals("show")) {
                    show(cache);
                }
                Thread.sleep(1000); // just to allow the console output to catch
                                    // up
            }
        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {

        }
    }

    private static void show(NodeCache cache) {
        if (cache.getCurrentData() != null)
            System.out.println(cache.getCurrentData().getPath() + " = " + new String(cache.getCurrentData().getData()));
        else
            System.out.println("cache don't set a value");
    }

    private static void remove(CuratorFramework client) throws Exception {
        try {
            client.delete().forPath(PATH);
        } catch (KeeperException.NoNodeException e) {
            // ignore
        }
    }

    private static void setValue(CuratorFramework client, String command, String[] args) throws Exception {
        if (args.length != 1) {
            System.err.println("syntax error (expected set <value>): " + command);
            return;
        }

        byte[] bytes = args[0].getBytes();
        try {
            client.setData().forPath(PATH, bytes);
        } catch (KeeperException.NoNodeException e) {
            client.create().creatingParentsIfNeeded().forPath(PATH, bytes);
        }
    }

    private static void printHelp() {
        System.out.println("An example of using PathChildrenCache. This example is driven by entering commands at the prompt:n");
        System.out.println("set <value>: Adds or updates a node with the given name");
        System.out.println("remove: Deletes the node with the given name");
        System.out.println("show: Display the node's value in the cache");
        System.out.println("quit: Quit the example");
        System.out.println();
    }
}

操作和上面的Path cache类似, 只是getCurrentData()返回的类型不同

Tree Node

这种类型的即可以监控节点的状态,还监控节点的子节点的状态, 类似上面两种cache的组合。 这也就是Tree的概念。 它监控整个树中节点的状态。 涉及到下面四个类。

    TreeCache
    TreeCacheListener
    TreeCacheEvent
    ChildData

而关键的TreeCache的构造函数为

public TreeCache(CuratorFramework client, String path, boolean cacheData)

想使用cache,依然要调用它的start方法,不用之后调用close方法。

getCurrentChildren()返回cache的状态,类型为Map<String, ChildData>。 而getCurrentData()返回监控的path的数据。

public void addListener(TreeCacheListener listener)可以增加listener来监控状态的改变。

例子依然采用和上面的例子类似, 尤其和Path Cache类似。

package com.colobu.zkrecipe.cache;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.Arrays;
import java.util.Map;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;
import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.KeeperException;

public class TreeCacheExample {
    private static final String PATH = "/example/treeCache";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        TreeCache cache = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            cache = new TreeCache(client, PATH);
            cache.start();
            processCommands(client, cache);
        } finally {
            CloseableUtils.closeQuietly(cache);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    private static void addListener(final TreeCache cache) {
        TreeCacheListener listener = new TreeCacheListener() {

            @Override
            public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
                switch (event.getType()) {
                case NODE_ADDED: {
                    System.out.println("TreeNode added: " + ZKPaths.getNodeFromPath(event.getData().getPath()) + ", value: "
                            + new String(event.getData().getData()));
                    break;
                }
                case NODE_UPDATED: {
                    System.out.println("TreeNode changed: " + ZKPaths.getNodeFromPath(event.getData().getPath()) + ", value: "
                            + new String(event.getData().getData()));
                    break;
                }
                case NODE_REMOVED: {
                    System.out.println("TreeNode removed: " + ZKPaths.getNodeFromPath(event.getData().getPath()));
                    break;
                }
                default:
                    System.out.println("Other event: " + event.getType().name());
                }
            }

        };

        cache.getListenable().addListener(listener);
    }

    private static void processCommands(CuratorFramework client, TreeCache cache) throws Exception {
        // More scaffolding that does a simple command line processor
        printHelp();
        try {
            addListener(cache);
            BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
            boolean done = false;
            while (!done) {
                System.out.print("> ");
                String line = in.readLine();
                if (line == null) {
                    break;
                }
                String command = line.trim();
                String[] parts = command.split("s");
                if (parts.length == 0) {
                    continue;
                }
                String operation = parts[0];
                String args[] = Arrays.copyOfRange(parts, 1, parts.length);
                if (operation.equalsIgnoreCase("help") || operation.equalsIgnoreCase("?")) {
                    printHelp();
                } else if (operation.equalsIgnoreCase("q") || operation.equalsIgnoreCase("quit")) {
                    done = true;
                } else if (operation.equals("set")) {
                    setValue(client, command, args);
                } else if (operation.equals("remove")) {
                    remove(client, command, args);
                } else if (operation.equals("list")) {
                    list(cache);
                }
                Thread.sleep(1000); // just to allow the console output to catch
                                    // up
            }
        } finally {

        }
    }

    private static void list(TreeCache cache) {
        if (cache.getCurrentChildren(PATH).size() == 0) {
            System.out.println("* empty *");
        } else {           
            for (Map.Entry<String, ChildData> entry : cache.getCurrentChildren(PATH).entrySet()) {
                System.out.println(entry.getKey() + " = " + new String(entry.getValue().getData()));
            }
        }
    }

    private static void remove(CuratorFramework client, String command, String[] args) throws Exception {
        if (args.length != 1) {
            System.err.println("syntax error (expected remove <path>): " + command);
            return;
        }
        String name = args[0];
        if (name.contains("/")) {
            System.err.println("Invalid node name" + name);
            return;
        }
        String path = ZKPaths.makePath(PATH, name);
        try {
            client.delete().forPath(path);
        } catch (KeeperException.NoNodeException e) {
            // ignore
        }
    }

    private static void setValue(CuratorFramework client, String command, String[] args) throws Exception {
        if (args.length != 2) {
            System.err.println("syntax error (expected set <path> <value>): " + command);
            return;
        }
        String name = args[0];
        if (name.contains("/")) {
            System.err.println("Invalid node name" + name);
            return;
        }
        String path = ZKPaths.makePath(PATH, name);
        byte[] bytes = args[1].getBytes();
        try {
            client.setData().forPath(path, bytes);
        } catch (KeeperException.NoNodeException e) {
            client.create().creatingParentsIfNeeded().forPath(path, bytes);
        }
    }

    private static void printHelp() {
        System.out.println("An example of using PathChildrenCache. This example is driven by entering commands at the prompt:n");
        System.out.println("set <name> <value>: Adds or updates a node with the given name");
        System.out.println("remove <name>: Deletes the node with the given name");
        System.out.println("list: List the nodes/values in the cache");
        System.out.println("quit: Quit the example");
        System.out.println();
    }
}

时间: 2024-08-04 04:17:17

分布式 ZooKeeper 缓存用法实例教程的相关文章

Python中zip()函数用法实例教程_python

本文实例讲述了Python中zip()函数的定义及用法,相信对于Python初学者有一定的借鉴价值.详情如下: 一.定义: zip([iterable, ...])zip()是Python的一个内建函数,它接受一系列可迭代的对象作为参数,将对象中对应的元素打包成一个个tuple(元组),然后返回由这些tuples组成的list(列表).若传入参数的长度不等,则返回list的长度和参数中长度最短的对象相同.利用*号操作符,可以将list unzip(解压). 二.用法示例: 读者看看下面的例子,对

Python中apply函数的用法实例教程_python

一.概述: python apply函数的具体含义如下:  apply(func [, args [, kwargs ]]) 函数用于当函数参数已经存在于一个元组或字典中时,间接地调用函数.args是一个包含将要提供给函数的按位置传递的参数的元组.如果省略了args,任何参数都不会被传递,kwargs是一个包含关键字参数的字典.   apply()的返回值就是func()的返回值,apply()的元素参数是有序的,元素的顺序必须和func()形式参数的顺序一致 二.使用示例: 下面给几个例子来详

Python迭代用法实例教程_python

本文实例讲述了Python中迭代的用法,是一个非常实用的技巧.分享给大家供大家参考借鉴之用.具体分析如下: 如果给定一个list或tuple,我们可以通过for循环来遍历这个list或tuple,这种遍历我们成为迭代(Iteration). 在Python中,迭代是通过for ... in来完成的,而很多语言比如C或者Java,迭代list是通过下标完成的,比如Java代码: for (i=0; i<list.length; i++) { n = list[i]; } 可以看出,Python的f

Python内置函数的用法实例教程_python

本文简单的分析了Python中常用的内置函数的用法,分享给大家供大家参考之用.具体分析如下: 一般来说,在Python中内置了很多有用的函数,我们可以直接调用. 而要调用一个函数,就需要知道函数的名称和参数,比如求绝对值的函数abs,只有一个参数.可以直接从Python的官方网站查看文档:http://docs.python.org/2/library/functions.html#abs 也可以在交互式命令行通过help(abs)查看abs函数的帮助信息. 调用abs函数: >>> a

Python中函数的用法实例教程_python

本文以数值计算为例讲述了Python中函数的用法,分享给大家供大家参考借鉴之用.具体如下: 我们都知道圆的面积计算公式为: S = πr2 当我们知道半径r的值时,就可以根据公式计算出面积.假设我们需要计算3个不同大小的圆的面积: r1 = 12.34 r2 = 9.08 r3 = 73.1 s1 = 3.14 * r1 * r1 s2 = 3.14 * r2 * r2 s3 = 3.14 * r3 * r3 当代码出现有规律的重复的时候,你就需要当心了,每次写3.14 * x * x不仅很麻烦

python的类变量和成员变量用法实例教程_python

本文实例形式讲解了python的类变量和成员变量用法,对于Python程序设计有一定的参考价值.分享给大家供大家参考.具体如下: 先看看下面这段代码: class TestClass(object): val1 = 100 def __init__(self): self.val2 = 200 def fcn(self,val = 400): val3 = 300 self.val4 = val self.val5 = 500 if __name__ == '__main__': inst =

sogou地图API用法实例教程_javascript技巧

本文实例讲述了sogou地图API应用,是非常实用的技巧.分享给大家供大家参考.具体实现方法如下: 地图的初始化 1.添加引用地图的API文件: <script src="http://xiazai.jb51.net/201409/other/api_v2.5.1.js" type="text/javascript"></script> 2.网站初始化加载事件: window.onload = function () { var map =

Yii查询生成器(Query Builder)用法实例教程_php实例

本文为yii官网英文文档的翻译版本,主要介绍了Yii查询生成器(Query Builder)的用法.分享给大家供大家参考之用.具体如下: 首先,Yii的查询生成器提供了用面向对象的方式写SQL语句.它允许开发人员使用类的方法和属性来指定一个SQL语句的各个部分.然后,组装成一个有效的SQL语句,可以通过调用DAO数据访问对象的描述方法为进一步执行.以下显示了一个典型的使用查询生成器建立一个select语句: $user = Yii::app()->db->createCommand() -&g

Python学习之asyncore模块用法实例教程_python

本文以实例分析了Python中asyncore模块的原理及用法,分享给大家供大家参考.具体分析如下: asyncore库是python的一个标准库,它是一个异步socket的包装.我们操作网络的时候可以直接使用socket等底层的库,但是asyncore使得我们可以更加方便的操作网络,避免直接使用socket,select,poll等工具时需要面对的复杂. 这个库很简单,包含了一个函数和一个类* loop()函数* dispatcher基类需要注意的是,loop函数是全局的,不是dispatch