跟着实例学习ZooKeeper的用法: Barrier

分布式Barrier是这样一个类: 它会阻塞所有节点上的等待进程,知道某一个被满足, 然后所有的节点继续进行。

比如赛马比赛中, 等赛马陆续来到起跑线前。 一声令下,所有的赛马都飞奔而出。

栅栏Barrier

DistributedBarrier类实现了栅栏的功能。 它的构造函数如下:

public DistributedBarrier(CuratorFramework client, String barrierPath)
Parameters:
client - client
barrierPath - path to use as the barrier

首先你需要设置栅栏,它将阻塞在它上面等待的线程:

setBarrier();

然后需要阻塞的线程调用“方法等待放行条件:

public void waitOnBarrier()

当条件满足时,移除栅栏,所有等待的线程将继续执行:

removeBarrier();

异常处理 DistributedBarrier 会监控连接状态,当连接断掉时waitOnBarrier()方法会抛出异常。

看一个例子:

package com.colobu.zkrecipe.barrier;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.barriers.DistributedBarrier;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;

public class DistributedBarrierExample {
    private static final int QTY = 5;
    private static final String PATH = "/examples/barrier";

    public static void main(String[] args) throws Exception {
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            ExecutorService service = Executors.newFixedThreadPool(QTY);
            DistributedBarrier controlBarrier = new DistributedBarrier(client, PATH);
            controlBarrier.setBarrier();

            for (int i = 0; i < QTY; ++i) {
                final DistributedBarrier barrier = new DistributedBarrier(client, PATH);
                final int index = i;
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {

                        Thread.sleep((long) (3 * Math.random()));
                        System.out.println("Client #" + index + " waits on Barrier");
                        barrier.waitOnBarrier();
                        System.out.println("Client #" + index + " begins");
                        return null;
                    }
                };
                service.submit(task);
            }

            Thread.sleep(10000);
            System.out.println("all Barrier instances should wait the condition");

            controlBarrier.removeBarrier();

            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);

        }

    }

}

这个例子创建了controlBarrier来设置栅栏和移除栅栏。 我们创建了5个线程,在此Barrier上等待。 最后移除栅栏后所有的线程才继续执行。

如果你开始不设置栅栏,所有的线程就不会阻塞住。

双栅栏Double Barrier

双栅栏允许客户端在计算的开始和结束时同步。当足够的进程加入到双栅栏时,进程开始计算, 当计算完成时,离开栅栏。 双栅栏类是DistributedDoubleBarrier。 构造函数为:

public DistributedDoubleBarrier(CuratorFramework client,
                                String barrierPath,
                                int memberQty)
Creates the barrier abstraction. memberQty is the number of members in the barrier. When enter() is called, it blocks until
all members have entered. When leave() is called, it blocks until all members have left.

Parameters:
client - the client
barrierPath - path to use
memberQty - the number of members in the barrier

memberQty是成员数量,当enter方法被调用时,成员被阻塞,直到所有的成员都调用了enter。 当leave方法被调用时,它也阻塞调用线程, 知道所有的成员都调用了leave。 就像百米赛跑比赛, 发令枪响, 所有的运动员开始跑,等所有的运动员跑过终点线,比赛才结束。

DistributedBarrier 会监控连接状态,当连接断掉时enter()leave方法会抛出异常。

例子代码:

package com.colobu.zkrecipe.barrier;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.barriers.DistributedBarrier;
import org.apache.curator.framework.recipes.barriers.DistributedDoubleBarrier;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;

public class DistributedBarrierExample {
    private static final int QTY = 5;
    private static final String PATH = "/examples/barrier";

    public static void main(String[] args) throws Exception {
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            ExecutorService service = Executors.newFixedThreadPool(QTY);
            for (int i = 0; i < QTY; ++i) {
                final DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(client, PATH, QTY);
                final int index = i;
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {

                        Thread.sleep((long) (3 * Math.random()));
                        System.out.println("Client #" + index + " enters");
                        barrier.enter();
                        System.out.println("Client #" + index + " begins");
                        Thread.sleep((long) (3000 * Math.random()));
                        barrier.leave();
                        System.out.println("Client #" + index + " left");
                        return null;
                    }
                };
                service.submit(task);
            }

            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);

        }

    }

}
时间: 2024-12-30 23:27:16

跟着实例学习ZooKeeper的用法: Barrier的相关文章

跟着实例学习ZooKeeper的用法: 计数器

这一篇文章我们将学习使用Curator来实现计数器. 顾名思义,计数器是用来计数的, 利用ZooKeeper可以实现一个集群共享的计数器. 只要使用相同的path就可以得到最新的计数器值, 这是由ZooKeeper的一致性保证的.Curator有两个计数器, 一个是用int来计数,一个用long来计数. SharedCount 这个类使用int类型来计数. 主要涉及三个类. SharedCount SharedCountReader SharedCountListener SharedCount

跟着实例学习ZooKeeper的用法: 分布式锁

锁 分布式的锁全局同步, 这意味着任何一个时间点不会有两个客户端都拥有相同的锁. 可重入锁Shared Reentrant Lock 首先我们先看一个全局可重入的锁. Shared意味着锁是全局可见的, 客户端都可以请求锁. Reentrant和JDK的ReentrantLock类似, 意味着同一个客户端在拥有锁的同时,可以多次获取,不会被阻塞. 它是由类InterProcessMutex来实现. 它的构造函数为: public InterProcessMutex(CuratorFramewor

跟着实例学习ZooKeeper的用法: Leader选举

ZooKeeper官方给出了使用zookeeper的几种用途. Leader Election Barriers Queues Locks Two-phased Commit 其它应用如Name Service, Configuration, Group Membership 在实际使用ZooKeeper开发中,我们最常用的是Apache Curator. 它由Netflix公司贡献给Apache,目前版本2.7.0. 相信你在使用ZK API开发时会遇到让人头疼的几个问题,ZK连接管理.SES

跟着实例学习ZooKeeper的用法: 临时节点

使用Curator也可以简化Ephemeral Node (临时节点)的操作. 临时节点驻存在ZooKeeper中,当连接和session断掉时被删除. 比如通过ZooKeeper发布服务,服务启动时将自己的信息注册为临时节点,当服务断掉时ZooKeeper将此临时节点删除,这样client就不会得到服务的信息了. PersistentEphemeralNode类代表临时节点. 通过下面的构造函数创建: public PersistentEphemeralNode(CuratorFramewor

跟着实例学习ZooKeeper的用法: Curator框架应用

前面的几篇文章介绍了一些ZooKeeper的应用方法, 本文将介绍Curator访问ZooKeeper的一些基本方法, 而不仅仅限于指定的Recipes, 你可以使用Curator API任意的访问ZooKeeper. CuratorFramework Curator框架提供了一套高级的API, 简化了ZooKeeper的操作. 它增加了很多使用ZooKeeper开发的特性,可以处理ZooKeeper集群复杂的连接管理和重试机制. 这些特性包括: 自动化的连接管理: 重新建立到ZooKeeper

跟着实例学习ZooKeeper的用法: 缓存

可以利用ZooKeeper在集群的各个节点之间缓存数据. 每个节点都可以得到最新的缓存的数据. Curator提供了三种类型的缓存方式:Path Cache,Node Cache 和Tree Cache. Path Cache Path Cache用来监控一个ZNode的子节点. 当一个子节点增加, 更新,删除时, Path Cache会改变它的状态, 会包含最新的子节点, 子节点的数据和状态. 这也正如它的名字表示的那样, 那监控path. 实际使用时会涉及到四个类: PathChildren

跟着实例学习ZooKeeper的用法: Curator扩展库

还记得Curator提供哪几个组件吗? 我们不妨回顾一下: Recipes Framework Utilities Client Errors Extensions 前面的例子其实前五个组件都涉及到了, 比如Utilities例子的TestServer, Client里的CuratorZookeeperClient, Errors里的ConnectionStateListener等. 还有最后一个组件我们还没有介绍,那就是Curator扩展组件. Recipes组件包含了丰富的Curator应用

跟着实例学习ZooKeeper的用法: 队列

使用Curator也可以简化Ephemeral Node (临时节点)的操作.Curator也提供ZK Recipe的分布式队列实现. 利用ZK的 PERSISTENTSEQUENTIAL节点, 可以保证放入到队列中的项目是按照顺序排队的. 如果单一的消费者从队列中取数据, 那么它是先入先出的,这也是队列的特点. 如果你严格要求顺序,你就的使用单一的消费者,可以使用leader选举只让leader作为唯一的消费者. 但是, 根据Netflix的Curator作者所说, ZooKeeper真心不适

分布式 ZooKeeper 缓存用法实例教程

可以利用ZooKeeper在集群的各个节点之间缓存数据. 每个节点都可以得到最新的缓存的数据. Curator提供了三种类型的缓存方式:Path Cache,Node Cache 和Tree Cache. Path Cache Path Cache用来监控一个ZNode的子节点. 当一个子节点增加, 更新,删除时, Path Cache会改变它的状态, 会包含最新的子节点, 子节点的数据和状态. 这也正如它的名字表示的那样, 那监控path. 实际使用时会涉及到四个类:     PathChil