linux中Zookeeper实战之分布式锁示例

首先准备一个Zookeeper集群环境,这里使用单机模拟集群环境,并使用代码方式启动服务。

Zookeeper服务

这里假定启动三个Zookeeper服务做集群

package my.zookeeperstudy;

import org.apache.commons.io.FileUtils;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import org.apache.zookeeper.server.quorum.QuorumPeerMain;

import java.io.File;
import java.net.InetAddress;
import java.util.Properties;

public class ZKServer {
    protected String id = null;
    protected String dataDir = null;
    protected String clientPort = null;

    public ZKServer(String id, String dataDir, String clientPort) {
        this.id = id;
        this.dataDir = dataDir;
        this.clientPort = clientPort;
    }

    public void startServer() {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Properties props = new Properties();
                    props.setProperty("tickTime", "2000");
                    props.setProperty("dataDir", dataDir);
                    FileUtils.write(new File(props.getProperty("dataDir"), "myid"), id);
                    props.setProperty("clientPort", clientPort);
                    props.setProperty("initLimit", "10");
                    props.setProperty("syncLimit", "5");
                    String hostname = InetAddress.getLocalHost().getHostName();
                    props.setProperty("server.1", hostname + ":2881:3881");
                    props.setProperty("server.2", hostname + ":2882:3882");
                    props.setProperty("server.3", hostname + ":2883:3883");

                    QuorumPeerConfig quorumConfig = new QuorumPeerConfig();
                    quorumConfig.parseProperties(props);

                    QuorumPeerMain quorumPeerMain = new QuorumPeerMain();
                    quorumPeerMain.runFromConfig(quorumConfig);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }

}
package my.zookeeperstudy;

public class ZKServer1 {

    public static void main(String[] args) throws Exception {
        ZKServer zkServer = new ZKServer("1", "/tmp/zookeeper1/data", "2181");
        zkServer.startServer();
    }

}
package my.zookeeperstudy;

public class ZKServer2 {

    public static void main(String[] args) throws Exception {
        ZKServer zkServer = new ZKServer("2", "/tmp/zookeeper2/data", "2182");
        zkServer.startServer();
    }

}
package my.zookeeperstudy;

public class ZKServer3 {

    public static void main(String[] args) throws Exception {
        ZKServer zkServer = new ZKServer("3", "/tmp/zookeeper3/data", "2183");
        zkServer.startServer();
    }

}
分布式锁类DistributedLocker

package my.zookeeperstudy.lock;

import org.apache.zookeeper.*;

import java.util.Collections;
import java.util.List;

public class DistributedLocker {
    private final ZooKeeper zk;
    private final String lockBasePath;
    private final String lockName;
    private String lockPath;

    public DistributedLocker(ZooKeeper zk, String lockBasePath, String lockName) {
        this.zk = zk;
        this.lockBasePath = lockBasePath;
        this.lockName = lockName;
    }

    public void getLock() throws KeeperException, InterruptedException {
        if (zk.exists(lockBasePath, true) == null) {
            zk.create(lockBasePath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
        lockPath = zk.create(lockBasePath + "/" + lockName, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println("getLock path: " + lockPath);
        final Object lock = new Object();
        synchronized (lock) {
            while (true) {
                List<String> nodes = zk.getChildren(lockBasePath, new Watcher() {
                    @Override
                    public void process(WatchedEvent event) {
                        synchronized (lock) {
                            lock.notifyAll();
                        }
                    }
                });
                Collections.sort(nodes);
                System.out.println(nodes);
                if (lockPath.endsWith(nodes.get(0))) {
                    return;
                } else {
                    lock.wait();
                }
            }
        }
    }

    public void releaseLock() throws KeeperException, InterruptedException {
        zk.delete(lockPath, -1);
        lockPath = null;
    }
}
测试客户端类

这里仍然模拟有三个客户端

package my.zookeeperstudy.lock;

import org.apache.zookeeper.*;

public class ZKClient {
    private final String lockBasePath = "/mylocks";
    private final String lockName = "mylock";

    public void start(String url) throws Exception {
        final ZooKeeper zk = new ZooKeeper(url, 10000, new Watcher() {
            public void process(WatchedEvent event) {
                System.out.println("event: " + event.getType());
            }
        });

        try {
            DistributedLocker locker = new DistributedLocker(zk, lockBasePath, lockName);

            System.out.println("before get lock");
            locker.getLock();
            System.out.println("after get lock");

            System.out.println("do something");
            Thread.sleep(60 * 1000);

            System.out.println("before release lock");
            locker.releaseLock();
            System.out.println("after release lock");
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}
package my.zookeeperstudy.lock;

public class ZKClient1 {
    public static void main(String[] args) throws Exception {
        ZKClient zkClient = new ZKClient();
        zkClient.start("localhost:2181");
    }
}
package my.zookeeperstudy.lock;

public class ZKClient2 {
    public static void main(String[] args) throws Exception {
        ZKClient zkClient = new ZKClient();
        zkClient.start("localhost:2182");
    }
}
package my.zookeeperstudy.lock;

public class ZKClient3 {
    public static void main(String[] args) throws Exception {
        ZKClient zkClient = new ZKClient();
        zkClient.start("localhost:2183");
    }
}
测试

首先依次启动ZKServer1,ZKServer2和ZKServer3,启动过程会有错误,可以忽略,那是因为检测别的节点的时候连接失败导致,生产环境可以针对具体情况忽略此类错误。

然后依次启动ZKClient1,ZKClient2和ZKClient3。

分别观察三个Client日志输出。

停止一个Client比如ZKClient1然后观察日志输出。

时间: 2024-09-17 20:38:18

linux中Zookeeper实战之分布式锁示例的相关文章

java使用zookeeper实现的分布式锁示例_java

使用zookeeper实现的分布式锁 分布式锁,实现了Lock接口 复制代码 代码如下: package com.concurrent; import java.io.IOException;import java.util.ArrayList;import java.util.Collections;import java.util.List;import java.util.concurrent.CountDownLatch;import java.util.concurrent.TimeU

linux中Zookeeper实战之选举

最近整理资料的时候发现了两年前自己写的一些Zookeeper的例子,今天整理了一下,放到这里,也许以后用的着. 首先准备一个Zookeeper集群环境,这里使用单机模拟集群环境,并使用代码方式启动服务. Zookeeper服务 这里假定启动三个Zookeeper服务做集群 package my.zookeeperstudy; import org.apache.commons.io.FileUtils; import org.apache.zookeeper.server.quorum.Quor

ZooKeeper 笔记(6) 分布式锁

目前分布式锁,比较成熟.主流的方案有基于redis及基于zookeeper的二种方案. 大体来讲,基于redis的分布式锁核心指令为SETNX,即如果目标key存在,写入缓存失败返回0,反之如果目标key不存在,写入缓存成功返回1,通过区分这二个不同的返回值,可以认为SETNX成功即为获得了锁. redis分布式锁,看上去很简单,但其实要考虑周全,并不容易,网上有一篇文章讨论得很详细:http://blog.csdn.net/ugg/article/details/41894947/,有兴趣的可

linux中利用mysqlhotcopy备份数据脚本示例

mysqlhotcopy只是简单的缓存写入和文件复制的过程,占用资源和备份速度比mysqldump快很多很多.特别适合大的数据库,但需要注意的是:mysqlhotcopy只支持MyISAM 引擎 1.安装perl  代码如下 复制代码 #yum -y install perl perl-DBI 使用mysqlhotcopy需要安装perl支持,因为mysqlhotcopy是perl写的 2.安装DBD-mysql  代码如下 复制代码 #wget http://www.fcbu.com/upim

linux中vsftpd虚拟用户配置脚本示例

每次新安装服务器后,都要进行一系列的配置,安装软件,修改配置等,为了今后更好的部署vsftpd,特此写了个脚本用于部署 登录FTP有三种方式,匿名登录.本地用户登录和虚拟用户登录. 匿名登录:在登录FTP时使用默认的用户名,一般是ftp或anonymous. 本地用户登录:使用系统用户登录,在/etc/passwd中. 虚拟用户登录:这是FTP专有用户,有两种方式实现虚拟用户,本地数据文件和数据库服务器. FTP虚拟用户是FTP服务器的专有用户,使用虚拟用户登录FTP,只能访问FTP服务器提供的

Linux中echo命令典型的应用示例

下面介绍echo命令的一个使用技巧,将字符串进行翻转操作 把一个字符串翻转示例1: [root@localhost software]# echo "abcdefg" | perl -lne '{$a = reverse($_); print $a;}' 得到的结果: gfedcba 把一个字符串翻转示例2: [root@localhost software]# echo bottle|rev 得到的结果: elttob

基于Zookeeper的分布式锁

这篇文章只需要你10分钟的时间. 实现分布式锁目前有三种流行方案,分别为基于数据库.Redis.Zookeeper的方案,其中前两种方案网络上有很多资料可以参考,本文不做展开.我们来看下使用Zookeeper如何实现分布式锁. 什么是Zookeeper? Zookeeper(业界简称zk)是一种提供配置管理.分布式协同以及命名的中心化服务,这些提供的功能都是分布式系统中非常底层且必不可少的基本功能,但是如果自己实现这些功能而且要达到高吞吐.低延迟同时还要保持一致性和可用性,实际上非常困难.因此z

基于ZooKeeper的分布式锁和队列

在分布式系统中,往往需要一些分布式同步原语来做一些协同工作,上一篇文章介绍了Zookeeper的基本原理,本文介绍下基于Zookeeper的Lock和Queue的实现,主要代码都来自Zookeeper的官方recipe. 锁(Lock) 完全分布式锁是全局同步的,这意味着在任何时刻没有两个客户端会同时认为它们都拥有相同的锁,使用 Zookeeper 可以实现分布式锁,需要首先定义一个锁节点(lock root node). 需要获得锁的客户端按照以下步骤来获取锁: 保证锁节点(lock root

ZooKeeper的分布式锁和队列实现实例教程

在分布式系统中,往往需要一些分布式同步原语来做一些协同工作,上一篇文章介绍了Zookeeper的基本原理,本文介绍下基于Zookeeper的Lock和Queue的实现,主要代码都来自Zookeeper的官方recipe. 锁(Lock) 完全分布式锁是全局同步的,这意味着在任何时刻没有两个客户端会同时认为它们都拥有相同的锁,使用 Zookeeper 可以实现分布式锁,需要首先定义一个锁节点(lock root node). 需要获得锁的客户端按照以下步骤来获取锁:     保证锁节点(lock