zookeeper分布式锁

摘要:分享牛原创,zookeeper使用,zookeeper锁在实际项目开发中还是很常用的,在这里我们介绍一下zookeeper分布式锁的使用,以及我们如何zookeeper分布式锁的原理。zookeeper节点理解。

zookeeper分布式锁有什么用呢?首先要明白锁是一个什么东西?举个通俗的例子,把门锁着了,外面的人进不去,里面的人可以随时出来,出来之后,还可以继续加锁。比如我们项目中,主要有供应商系统锁库存这种情况,锁库存的时候不能让其他的人去修改库存信息。这里就需要使用的时候加锁。当然了也可以使用数据库或者redis版本锁的概念,根据版本去区分到底如何锁库。

首先我们看一下zookeeper节点类型。

zookeeper节点类型分为以下四种:

1.1.1. 节点说明

public enum CreateMode {

    /**
     * The znode will not be automatically deleted upon client's disconnect.
     */
    PERSISTENT (0, false, false),
    /**
    * The znode will not be automatically deleted upon client's disconnect,
    * and its name will be appended with a monotonically increasing number.
    */
    PERSISTENT_SEQUENTIAL (2, false, true),
    /**
     * The znode will be deleted upon the client's disconnect.
     */
    EPHEMERAL (1, true, false),
    /**
     * The znode will be deleted upon the client's disconnect, and its name
     * will be appended with a monotonically increasing number.
     */
    EPHEMERAL_SEQUENTIAL (3, true, true);
}

从持久化的层次划分:

1.持久化节点:不删除节点永远存在。

2.非持久节点,换言之就是临时节点,临时节点就是客户端连接的时候创建,客户端挂起的时候,临时节点自动删除。

从排序层次划分:

1.持久有序。

2.持久无序,

3.临时有序。

4.临时无序。

这里需要注意持久化节点可以创建子节点。非持久化节点不能创建子节点。这里可以自己去使用命令去测试。

非持久节点就是创建的时候存在,消失的时候,节点自动删除,所以我们利用这个特性,实现我们的需求,比如,我可以程序启动的时候在指定的持久节点,创建临时节点,当程序挂掉的时候,临时节点消失,我们可以一直监控指定父节点中的子节点集合,就可以监控程序的健康状态。

1.1.2. zookeeper 分布式锁实现

接下来,在上面理解节点的基础之上,我们可以去实现zookeeper 分布式锁。具体怎么实现呢?思路如下:

我们可以创建一个持久节点,在程序中我们每次创建临时子节点,然后我们遍历持久节点下面的子节点,因为临时节点我们设置的时候是有序的。所以我们可以加锁的时候,创建了一个临时有序节点,当我们加锁完成自己的业务之后,释放锁,然后,这个删除临时节点,所以设计的核心点就是:

1.创建父节点。持久节点

2.创建有序的临时子节点。

3.删除临时节点,当业务完成的时候。

4.每次需要判断自身的临时节点,是否是最小的。为什么要判断是最小的呢?因为不是最小的话,说明前面还有一些节点在加锁执行中,所以我们这个节点不能加锁执行。

5.怎么让自身节点监听执行呢?因为如果自身节点是最小的,可以直接执行,如果不是最小的。要监听前面的节点是否已经删除,如果其他的前面的节点都删除了。则自己就可以加锁执行业务代码了。

下面开始书写我们的代码吧?

package com.shareniu.zkTest;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

public class ShareniuDistributedLock implements Watcher {
 private int threadId;
// 主要区分线程
private static  String PREFIX_OF_THREAD = null;
// 子节点的前缀
private static final String EPHEMERAL_SEQUENTIAL_PATH = "/shareniuLock/sub";
// 父节点
private static final String PARENT_PATH = "/shareniuLock";
protected static final String CONNECTION_STRING = "101.201.xx.xx:2181";
protected static final int SESSION_TIMEOUT = 10000;
//开启的线程的数量
private static final int THREAD_NUM = 5;
// 创建临时节点自身
private String selfPath;
// zk连接对象
  public ShareniuDistributedLock(int threadId) {
        this.threadId = threadId;
        PREFIX_OF_THREAD = "【第"+threadId+"个线程】";
    }
private ZooKeeper zooKeeper = null;
private CountDownLatch cdl = new CountDownLatch(1);
private static final CountDownLatch threadSemaphore = new CountDownLatch(THREAD_NUM);
private String waitPath;
public static void main(String[] args) {
     for(int i=0; i < THREAD_NUM; i++){
            final int threadId = i+1;
            new Thread(){
                @Override
                public void run() {
                    try{
                    	ShareniuDistributedLock dc = new ShareniuDistributedLock(threadId);
                        dc.createConnection(CONNECTION_STRING, SESSION_TIMEOUT);
                        //GROUP_PATH不存在的话,由一个线程创建即可;
                        synchronized (threadSemaphore){
                            dc.createPath(PARENT_PATH, "该节点由线程" + threadId + "创建", true);
                        }
                        dc.getLock();
                    } catch (Exception e){
                        e.printStackTrace();
                    }
                }
            }.start();
        }
        try {
            threadSemaphore.await();
            System.out.println("所有线程运行结束!");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
}
/**
 * 实现监听器中的方法拿到WatchedEvent对象
 */
public void process(WatchedEvent event) {
    if(event == null){
            return;
        }
        Event.KeeperState keeperState = event.getState();
        Event.EventType eventType = event.getType();
        if ( Event.KeeperState.SyncConnected == keeperState) {
            if ( Event.EventType.None == eventType ) {
                System.out.println( PREFIX_OF_THREAD + "成功连接上ZK服务器" );
                cdl.countDown();
            }else if (event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(waitPath)) {
                try {
                    if(checkMinPathOfChilde()){
                        getLockByShelf();
                    }
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }else if ( Event.KeeperState.Disconnected == keeperState ) {
            System.out.println( PREFIX_OF_THREAD + "与ZK服务器断开连接" );
        } else if ( Event.KeeperState.AuthFailed == keeperState ) {
            System.out.println( PREFIX_OF_THREAD + "权限检查失败" );
        } else if ( Event.KeeperState.Expired == keeperState ) {
            System.out.println( PREFIX_OF_THREAD + "会话失效" );
        }
}

/**
 * 关闭ZK连接
 */
public void releaseConnection() {
if (this.zooKeeper != null) {
try {
this.zooKeeper.close();
} catch (InterruptedException e) {
}
}
System.out.println(PREFIX_OF_THREAD + "释放连接");
}

/***
 * 创建连接
 *
 * @param connectString
 *            连接的服务器字符串
 * @param sessionTimeout
 *            超时时间
 * @throws IOException
 * @throws InterruptedException
 */
public void createConnection(String connectString, int sessionTimeout)
throws IOException, InterruptedException {
zooKeeper = new ZooKeeper(connectString, sessionTimeout, this);
System.out.println("打开连接........");
// 因为打开连接只需要一次,这里为了防止并发,使用的CountDownLatch对象
// 程序一直等待,直到cdl.countDown();方法的调用
cdl.await();
}

/**
 * 创建节点
 *
 * @param path
 *            路径
 * @param data
 *            内容
 * @param needWatch
 * @return
 * @throws KeeperException
 * @throws InterruptedException
 */
public boolean createPath(String path, String data, boolean needWatch)
throws KeeperException, InterruptedException {
// 判断节点是否存在存在就不要创建了。
Stat exists = zooKeeper.exists(path, needWatch);
if (exists == null) {
// 节点不存在
System.out.println(PREFIX_OF_THREAD
+ "节点创建成功, Path: "
+ this.zooKeeper.create(path, data.getBytes(),
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)
+ ", content: " + data);
}
return true;
}

/**
 * 获取锁
 *
 * @throws KeeperException
 * @throws InterruptedException
 */
private void getLock() throws KeeperException, InterruptedException {
// 创建临时节点
selfPath = zooKeeper.create(EPHEMERAL_SEQUENTIAL_PATH, null,
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
System.out.println("创建临时节点:" + selfPath);
if (checkMinPathOfChilde()) {
getLockByShelf();
}
}

/**
 * 获取到锁了
 *
 * @throws InterruptedException
 * @throws KeeperException
 */
private void getLockByShelf() throws KeeperException, InterruptedException {
// 获取到锁了 开始 执行代码 删除节点 释放连接
if (zooKeeper.exists(this.selfPath, false) == null) {
System.out.println(PREFIX_OF_THREAD + "本节点已不在了...");
return;
} else {
// 节点存在
System.out.println(PREFIX_OF_THREAD + "获取锁成功....");
// 休息一下
Thread.sleep(1000);
System.out.println(PREFIX_OF_THREAD + "删除临时节点:" + selfPath);
// 删除的时候,版本是-1就是所有的都删除
zooKeeper.delete(this.selfPath, -1);
releaseConnection();
// 释放锁 其他的程序可以 继续打开连接
cdl.countDown();
}
}

/**
 * 获取锁
 *
 * @return
 * @throws InterruptedException
 * @throws KeeperException
 */
private boolean checkMinPathOfChilde() throws KeeperException,
InterruptedException {
// 获取父节点中的所有子节点
List<String> subNodes = zooKeeper.getChildren(PARENT_PATH, false);
// 临时节点是有序的,那就排序找最小的吧
Collections.sort(subNodes);
int index = subNodes
.indexOf(selfPath.substring(PARENT_PATH.length() + 1));
switch (index) {
case -1: {
System.out.println(PREFIX_OF_THREAD + "节点已不在了..." + selfPath);
return false;
}
case 0: {
System.out.println(PREFIX_OF_THREAD + "自己可以获取锁执行代码了" + selfPath);
return true;
}
default: {
this.waitPath = PARENT_PATH + "/" + subNodes.get(index - 1);
System.out.println(PREFIX_OF_THREAD + "前面的节点" + waitPath);
try {
zooKeeper.getData(waitPath, true, new Stat());
return false;
} catch (KeeperException e) {
if (zooKeeper.exists(waitPath, false) == null) {
System.out.println(PREFIX_OF_THREAD + "本节点前面的节点:"
+ waitPath + "");
// 递归找吧
return checkMinPathOfChilde();
} else {
throw e;
}
}
}
}
}
}

1.1.3. 程序的输出

程序的输出如下:

打开连接........

打开连接........

打开连接........

打开连接........

打开连接........

【第5个线程】成功连接上ZK服务器

【第5个线程】成功连接上ZK服务器

【第5个线程】成功连接上ZK服务器

【第5个线程】成功连接上ZK服务器

【第5个线程】成功连接上ZK服务器

创建临时节点:/shareniuLock/sub0000000001

【第5个线程】自己可以获取锁执行代码了/shareniuLock/sub0000000001

创建临时节点:/shareniuLock/sub0000000002

【第5个线程】获取锁成功....

【第5个线程】前面的节点/shareniuLock/sub0000000001

创建临时节点:/shareniuLock/sub0000000003

创建临时节点:/shareniuLock/sub0000000004

【第5个线程】前面的节点/shareniuLock/sub0000000002

【第5个线程】前面的节点/shareniuLock/sub0000000003

创建临时节点:/shareniuLock/sub0000000005

【第5个线程】前面的节点/shareniuLock/sub0000000004

【第5个线程】删除临时节点:/shareniuLock/sub0000000001

【第5个线程】释放连接

【第5个线程】自己可以获取锁执行代码了/shareniuLock/sub0000000002

【第5个线程】获取锁成功....

【第5个线程】删除临时节点:/shareniuLock/sub0000000002

【第5个线程】释放连接

【第5个线程】自己可以获取锁执行代码了/shareniuLock/sub0000000003

【第5个线程】获取锁成功....

【第5个线程】删除临时节点:/shareniuLock/sub0000000003

【第5个线程】释放连接

【第5个线程】自己可以获取锁执行代码了/shareniuLock/sub0000000004

【第5个线程】获取锁成功....

【第5个线程】删除临时节点:/shareniuLock/sub0000000004

【第5个线程】释放连接

【第5个线程】自己可以获取锁执行代码了/shareniuLock/sub0000000005

【第5个线程】获取锁成功....

【第5个线程】删除临时节点:/shareniuLock/sub0000000005

【第5个线程】释放连接

1.1.4. zk节点的查看

下面我们看一下zk的节点,

 

在此证明,节点是有序的,释放锁的时候,会删除临时节点。

分享牛原创(尊重原创 转载对的时候第一行请注明,转载出处来自分享牛http://blog.csdn.net/qq_30739519)

时间: 2024-10-10 04:49:35

zookeeper分布式锁的相关文章

Curator Zookeeper分布式锁

Curator Zookeeper分布式锁 pom.xml中添加如下配置 <!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes --> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version

zookeeper分布式锁避免羊群效应(Herd Effect)

本文主要讲述在使用ZooKeeper进行分布式锁的实现过程中,如何有效的避免"羊群效应( herd effect)"的出现. 一般的分布式锁实现 这里简单的讲下一般的分布式锁如何实现.具体的代码实现可以在这里看到:https://svn.apache.org/repos/asf/zookeeper/trunk/src/recipes/lock/ 在之前的<ZooKeepe数据模型>一文中提到过,zookeeper中节点的创建类型有4类,这里我们重点关注下临时顺序节点.这种类

基于Zookeeper的分布式锁

这篇文章只需要你10分钟的时间. 实现分布式锁目前有三种流行方案,分别为基于数据库.Redis.Zookeeper的方案,其中前两种方案网络上有很多资料可以参考,本文不做展开.我们来看下使用Zookeeper如何实现分布式锁. 什么是Zookeeper? Zookeeper(业界简称zk)是一种提供配置管理.分布式协同以及命名的中心化服务,这些提供的功能都是分布式系统中非常底层且必不可少的基本功能,但是如果自己实现这些功能而且要达到高吞吐.低延迟同时还要保持一致性和可用性,实际上非常困难.因此z

ZooKeeper的分布式锁和队列实现实例教程

在分布式系统中,往往需要一些分布式同步原语来做一些协同工作,上一篇文章介绍了Zookeeper的基本原理,本文介绍下基于Zookeeper的Lock和Queue的实现,主要代码都来自Zookeeper的官方recipe. 锁(Lock) 完全分布式锁是全局同步的,这意味着在任何时刻没有两个客户端会同时认为它们都拥有相同的锁,使用 Zookeeper 可以实现分布式锁,需要首先定义一个锁节点(lock root node). 需要获得锁的客户端按照以下步骤来获取锁:     保证锁节点(lock

redis分布式锁的运用和理解

问题描述 redis分布式锁的运用和理解 redis是单进程单线程模式 ,为什么还需要分布式锁?跨jvm是指多个服务器上在运行同一个服务吗? 网上看了很多说跨jvm会需要锁 ,但还是不怎么理解 解决方案 redis分布式锁redis分布式锁-SETNX实现Redis实现分布式锁 解决方案二: redis 3.0也有集群模式了,可以多个机器组成一个整体的cache 解决方案三: redis分布式锁和zookeeper分布式锁都在在企业服务中常用的知识.譬如:你有一个服务程序,需要部署在5台独立的机

基于Redis的分布式锁真的安全吗?(下)

自从我写完这个话题的上半部分之后,就感觉头脑中出现了许多细小的声音,久久挥之不去.它们就像是在为了一些鸡毛蒜皮的小事而相互争吵个不停.的确,有关分布式的话题就是这样,琐碎异常,而且每个人说的话听起来似乎都有道理.   今天,我们就继续探讨这个话题的后半部分.本文中,我们将从Antirez反驳Martin Kleppmann的观点开始讲起,然后会涉及到Hacker News上出现的一些讨论内容,接下来我们还会讨论到基于Zookeeper和Chubby的分布式锁是怎样的,并和Redlock进行一些对

ZooKeeper场景实践:(7) 分布式锁

1.基本介绍 分布式锁是控制分布式系统之间同步访问共享资源的一种方式,需要互斥来防止彼此干扰来保证一致性.利用Zookeeper的强一致性可以完成锁服务.Zookeeper的官方文档是列举了两种锁,独占锁和共享锁.独占锁保证任何时候都只有一个进程能或者资源的读写权限.共享锁可以同时有多个读,但是同一时刻最多只能有一个写,读和写是互斥的. 2.场景分析 我们准备来实现互斥的锁,按照官网的思路,给定一个锁的路径,如/Lock,所有要申请这个锁的进程都在/Lock目录下创建一个/Lock/lock-的

基于ZooKeeper的分布式锁和队列

在分布式系统中,往往需要一些分布式同步原语来做一些协同工作,上一篇文章介绍了Zookeeper的基本原理,本文介绍下基于Zookeeper的Lock和Queue的实现,主要代码都来自Zookeeper的官方recipe. 锁(Lock) 完全分布式锁是全局同步的,这意味着在任何时刻没有两个客户端会同时认为它们都拥有相同的锁,使用 Zookeeper 可以实现分布式锁,需要首先定义一个锁节点(lock root node). 需要获得锁的客户端按照以下步骤来获取锁: 保证锁节点(lock root

基于ZooKeeper的一种简单分布式锁的实现

        ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务.基于ZooKeeper,我们可以实现一种简单的分布式互斥锁,包括可重入与不可重入.代码如下: import java.io.IOException; import java.util.ArrayList; import java.util.Random; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperExcept