基于zookeeper的分布式lock实现

背景

 继续上一篇文章:http://agapple.iteye.com/blog/1183972 ,项目中需要对分布式任务进行调度,那对应的分布式lock实现在所难免。

 

 这一周,在基于BooleanMutex的基础上,实现了zookeeper的分布式锁,用于控制多进程+多线程的lock控制

 

算法

可以预先看一下zookeeper的官方文档: 

 

lock操作过程:

  • 首先为一个lock场景,在zookeeper中指定对应的一个根节点,用于记录资源竞争的内容
  • 每个lock创建后,会lazy在zookeeper中创建一个node节点,表明对应的资源竞争标识。 (小技巧:node节点为EPHEMERAL_SEQUENTIAL,自增长的临时节点)
  • 进行lock操作时,获取对应lock根节点下的所有字节点,也即处于竞争中的资源标识
  • 按照Fair竞争的原则,按照对应的自增内容做排序,取出编号最小的一个节点做为lock的owner,判断自己的节点id是否就为owner id,如果是则返回,lock成功。
  • 如果自己非owner id,按照排序的结果找到序号比自己前一位的id,关注它锁释放的操作(也就是exist watcher),形成一个链式的触发过程。

unlock操作过程:

  • 将自己id对应的节点删除即可,对应的下一个排队的节点就可以收到Watcher事件,从而被唤醒得到锁后退出

其中的几个关键点:

  1. node节点选择为EPHEMERAL_SEQUENTIAL很重要。
    * 自增长的特性,可以方便构建一个基于Fair特性的锁,前一个节点唤醒后一个节点,形成一个链式的触发过程。可以有效的避免"惊群效应"(一个锁释放,所有等待的线程都被唤醒),有针对性的唤醒,提升性能。
    * 选择一个EPHEMERAL临时节点的特性。因为和zookeeper交互是一个网络操作,不可控因素过多,比如网络断了,上一个节点释放锁的操作会失败。临时节点是和对应的session挂接的,session一旦超时或者异常退出其节点就会消失,类似于ReentrantLock中等待队列Thread的被中断处理。
  2. 获取lock操作是一个阻塞的操作,而对应的Watcher是一个异步事件,所以需要使用信号进行通知,正好使用上一篇文章中提到的BooleanMutex,可以比较方便的解决锁重入的问题。(锁重入可以理解为多次读操作,锁释放为写抢占操作)

注意:

  • 使用EPHEMERAL会引出一个风险:在非正常情况下,网络延迟比较大会出现session timeout,zookeeper就会认为该client已关闭,从而销毁其id标示,竞争资源的下一个id就可以获取锁。这时可能会有两个process同时拿到锁在跑任务,所以设置好session timeout很重要。
  • 同样使用PERSISTENT同样会存在一个死锁的风险,进程异常退出后,对应的竞争资源id一直没有删除,下一个id一直无法获取到锁对象。

没有两全其美的做法,两者取其一,选择自己一个能接受的即可

 

代码

1.public class DistributedLock {
2.
3.    private static final byte[]  data      = { 0x12, 0x34 };
4.    private ZooKeeperx           zookeeper = ZooKeeperClient.getInstance();
5.    private final String         root;                                     //根节点路径
6.    private String               id;
7.    private LockNode             idName;
8.    private String               ownerId;
9.    private String               lastChildId;
10.    private Throwable            other     = null;
11.    private KeeperException      exception = null;
12.    private InterruptedException interrupt = null;
13.
14.    public DistributedLock(String root) {
15.        this.root = root;
16.        ensureExists(root);
17.    }
18.
19.    /**
20.     * 尝试获取锁操作,阻塞式可被中断
21.     */
22.    public void lock() throws InterruptedException, KeeperException {
23.        // 可能初始化的时候就失败了
24.        if (exception != null) {
25.            throw exception;
26.        }
27.
28.        if (interrupt != null) {
29.            throw interrupt;
30.        }
31.
32.        if (other != null) {
33.            throw new NestableRuntimeException(other);
34.        }
35.
36.        if (isOwner()) {//锁重入
37.            return;
38.        }
39.
40.        BooleanMutex mutex = new BooleanMutex();
41.        acquireLock(mutex);
42.        // 避免zookeeper重启后导致watcher丢失,会出现死锁使用了超时进行重试
43.        try {
44.            mutex.get(DEFAULT_TIMEOUT_PERIOD, TimeUnit.MICROSECONDS);// 阻塞等待值为true
45.            // mutex.get();
46.        } catch (TimeoutException e) {
47.            if (!mutex.state()) {
48.                lock();
49.            }
50.        }
51.
52.        if (exception != null) {
53.            throw exception;
54.        }
55.
56.        if (interrupt != null) {
57.            throw interrupt;
58.        }
59.
60.        if (other != null) {
61.            throw new NestableRuntimeException(other);
62.        }
63.    }
64.
65.    /**
66.     * 尝试获取锁对象, 不会阻塞
67.     *
68.     * @throws InterruptedException
69.     * @throws KeeperException
70.     */
71.    public boolean tryLock() throws KeeperException {
72.        // 可能初始化的时候就失败了
73.        if (exception != null) {
74.            throw exception;
75.        }
76.
77.        if (isOwner()) {//锁重入
78.            return true;
79.        }
80.
81.        acquireLock(null);
82.
83.        if (exception != null) {
84.            throw exception;
85.        }
86.
87.        if (interrupt != null) {
88.            Thread.currentThread().interrupt();
89.        }
90.
91.        if (other != null) {
92.            throw new NestableRuntimeException(other);
93.        }
94.
95.        return isOwner();
96.    }
97.
98.    /**
99.     * 释放锁对象
100.     */
101.    public void unlock() throws KeeperException {
102.        if (id != null) {
103.            try {
104.                zookeeper.delete(root + "/" + id, -1);
105.            } catch (InterruptedException e) {
106.                Thread.currentThread().interrupt();
107.            } catch (KeeperException.NoNodeException e) {
108.                // do nothing
109.            } finally {
110.                id = null;
111.            }
112.        } else {
113.            //do nothing
114.        }
115.    }
116.
117.    private void ensureExists(final String path) {
118.        try {
119.            Stat stat = zookeeper.exists(path, false);
120.            if (stat != null) {
121.                return;
122.            }
123.
124.            zookeeper.create(path, data, CreateMode.PERSISTENT);
125.        } catch (KeeperException e) {
126.            exception = e;
127.        } catch (InterruptedException e) {
128.            Thread.currentThread().interrupt();
129.            interrupt = e;
130.        }
131.    }
132.
133.    /**
134.     * 返回锁对象对应的path
135.     */
136.    public String getRoot() {
137.        return root;
138.    }
139.
140.    /**
141.     * 判断当前是不是锁的owner
142.     */
143.    public boolean isOwner() {
144.        return id != null && ownerId != null && id.equals(ownerId);
145.    }
146.
147.    /**
148.     * 返回当前的节点id
149.     */
150.    public String getId() {
151.        return this.id;
152.    }
153.
154.    // ===================== helper method =============================
155.
156.    /**
157.     * 执行lock操作,允许传递watch变量控制是否需要阻塞lock操作
158.     */
159.    private Boolean acquireLock(final BooleanMutex mutex) {
160.        try {
161.            do {
162.                if (id == null) {//构建当前lock的唯一标识
163.                    long sessionId = zookeeper.getDelegate().getSessionId();
164.                    String prefix = "x-" + sessionId + "-";
165.                    //如果第一次,则创建一个节点
166.                    String path = zookeeper.create(root + "/" + prefix, data,
167.                            CreateMode.EPHEMERAL_SEQUENTIAL);
168.                    int index = path.lastIndexOf("/");
169.                    id = StringUtils.substring(path, index + 1);
170.                    idName = new LockNode(id);
171.                }
172.
173.                if (id != null) {
174.                    List<String> names = zookeeper.getChildren(root, false);
175.                    if (names.isEmpty()) {
176.                        id = null;//异常情况,重新创建一个
177.                    } else {
178.                        //对节点进行排序
179.                        SortedSet<LockNode> sortedNames = new TreeSet<LockNode>();
180.                        for (String name : names) {
181.                            sortedNames.add(new LockNode(name));
182.                        }
183.
184.                        if (sortedNames.contains(idName) == false) {
185.                            id = null;//清空为null,重新创建一个
186.                            continue;
187.                        }
188.
189.                        //将第一个节点做为ownerId
190.                        ownerId = sortedNames.first().getName();
191.                        if (mutex != null && isOwner()) {
192.                            mutex.set(true);//直接更新状态,返回
193.                            return true;
194.                        } else if (mutex == null) {
195.                            return isOwner();
196.                        }
197.
198.                        SortedSet<LockNode> lessThanMe = sortedNames.headSet(idName);
199.                        if (!lessThanMe.isEmpty()) {
200.                            //关注一下排队在自己之前的最近的一个节点
201.                            LockNode lastChildName = lessThanMe.last();
202.                            lastChildId = lastChildName.getName();
203.                            //异步watcher处理
204.                            zookeeper.exists(root + "/" + lastChildId, new AsyncWatcher() {
205.
206.                                public void asyncProcess(WatchedEvent event) {
207.                                    acquireLock(mutex);
208.                                }
209.
210.                            });
211.
212.                            if (stat == null) {
213.                                acquireLock(mutex);// 如果节点不存在,需要自己重新触发一下,watcher不会被挂上去
214.                            }
215.                        } else {
216.                            if (isOwner()) {
217.                                mutex.set(true);
218.                            } else {
219.                                id = null;// 可能自己的节点已超时挂了,所以id和ownerId不相同
220.                            }
221.                        }
222.                    }
223.                }
224.            } while (id == null);
225.        } catch (KeeperException e) {
226.            exception = e;
227.            if (mutex != null) {
228.                mutex.set(true);
229.            }
230.        } catch (InterruptedException e) {
231.            interrupt = e;
232.            if (mutex != null) {
233.                mutex.set(true);
234.            }
235.        } catch (Throwable e) {
236.            other = e;
237.            if (mutex != null) {
238.                mutex.set(true);
239.            }
240.        }
241.
242.        if (isOwner() && mutex != null) {
243.            mutex.set(true);
244.        }
245.        return Boolean.FALSE;
246.    }
247.}

相关说明:

 

 

测试代码:

1.@Test
2.    public void test_lock() {
3.        ExecutorService exeucotr = Executors.newCachedThreadPool();
4.        final int count = 50;
5.        final CountDownLatch latch = new CountDownLatch(count);
6.        final DistributedLock[] nodes = new DistributedLock[count];
7.        for (int i = 0; i < count; i++) {
8.            final DistributedLock node = new DistributedLock(dir);
9.            nodes[i] = node;
10.            exeucotr.submit(new Runnable() {
11.
12.                public void run() {
13.                    try {
14.                        Thread.sleep(1000);
15.                        node.lock(); //获取锁
16.                        Thread.sleep(100 + RandomUtils.nextInt(100));
17.
18.                        System.out.println("id: " + node.getId() + " is leader: " + node.isOwner());
19.                    } catch (InterruptedException e) {
20.                        want.fail();
21.                    } catch (KeeperException e) {
22.                        want.fail();
23.                    } finally {
24.                        latch.countDown();
25.                        try {
26.                            node.unlock();
27.                        } catch (KeeperException e) {
28.                            want.fail();
29.                        }
30.                    }
31.
32.                }
33.            });
34.        }
35.
36.        try {
37.            latch.await();
38.        } catch (InterruptedException e) {
39.            want.fail();
40.        }
41.
42.        exeucotr.shutdown();
43.    }

升级版

 实现了一个分布式lock后,可以解决多进程之间的同步问题,但设计多线程+多进程的lock控制需求,单jvm中每个线程都和zookeeper进行网络交互成本就有点高了,所以基于DistributedLock,实现了一个分布式二层锁。

 

大致原理就是ReentrantLock 和 DistributedLock的一个结合。

 

 

  •  单jvm的多线程竞争时,首先需要先拿到第一层的ReentrantLock的锁
  • 拿到锁之后这个线程再去和其他JVM的线程竞争锁,最后拿到之后锁之后就开始处理任务。

锁的释放过程是一个反方向的操作,先释放DistributedLock,再释放ReentrantLock。 可以思考一下,如果先释放ReentrantLock,假如这个JVM ReentrantLock竞争度比较高,一直其他JVM的锁竞争容易被饿死。

代码:

1.public class DistributedReentrantLock extends DistributedLock {
2.
3.    private static final String ID_FORMAT     = "Thread[{0}] Distributed[{1}]";
4.    private ReentrantLock       reentrantLock = new ReentrantLock();
5.
6.    public DistributedReentrantLock(String root) {
7.        super(root);
8.    }
9.
10.    public void lock() throws InterruptedException, KeeperException {
11.        reentrantLock.lock();//多线程竞争时,先拿到第一层锁
12.        super.lock();
13.    }
14.
15.    public boolean tryLock() throws KeeperException {
16.        //多线程竞争时,先拿到第一层锁
17.        return reentrantLock.tryLock() && super.tryLock();
18.    }
19.
20.    public void unlock() throws KeeperException {
21.        super.unlock();
22.        reentrantLock.unlock();//多线程竞争时,释放最外层锁
23.    }
24.
25.    @Override
26.    public String getId() {
27.        return MessageFormat.format(ID_FORMAT, Thread.currentThread().getId(), super.getId());
28.    }
29.
30.    @Override
31.    public boolean isOwner() {
32.        return reentrantLock.isHeldByCurrentThread() && super.isOwner();
33.    }
34.
35.}

测试代码:

1.@Test
2.    public void test_lock() {
3.        ExecutorService exeucotr = Executors.newCachedThreadPool();
4.        final int count = 50;
5.        final CountDownLatch latch = new CountDownLatch(count);
6.
7.        final DistributedReentrantLock lock = new DistributedReentrantLock(dir); //单个锁
8.        for (int i = 0; i < count; i++) {
9.            exeucotr.submit(new Runnable() {
10.
11.                public void run() {
12.                    try {
13.                        Thread.sleep(1000);
14.                        lock.lock();
15.                        Thread.sleep(100 + RandomUtils.nextInt(100));
16.
17.                        System.out.println("id: " + lock.getId() + " is leader: " + lock.isOwner());
18.                    } catch (InterruptedException e) {
19.                        want.fail();
20.                    } catch (KeeperException e) {
21.                        want.fail();
22.                    } finally {
23.                        latch.countDown();
24.                        try {
25.                            lock.unlock();
26.                        } catch (KeeperException e) {
27.                            want.fail();
28.                        }
29.                    }
30.
31.                }
32.            });
33.        }
34.
35.        try {
36.            latch.await();
37.        } catch (InterruptedException e) {
38.            want.fail();
39.        }
40.
41.        exeucotr.shutdown();
42.    }

最后

其实再可以发散一下,实现一个分布式的read/write lock,也差不多就是这个理了。项目结束后,有时间可以写一下

 

大致思路:

 

  1. 竞争资源标示:  read_自增id , write_自增id
  2. 首先按照自增id进行排序,如果队列的前边都是read标识,对应的所有read都获得锁。如果队列的前边是write标识,第一个write节点获取锁
  3. watcher监听: read监听距离自己最近的一个write节点的exist,write监听距离自己最近的一个节点(read或者write节点)
时间: 2025-01-19 14:10:56

基于zookeeper的分布式lock实现的相关文章

基于zookeeper的分布式并发锁实践

基于zookeeper的分布式并发锁实践 http://www.365yg.com/item/6421663785764782593/

基于Zookeeper的分布式锁

这篇文章只需要你10分钟的时间. 实现分布式锁目前有三种流行方案,分别为基于数据库.Redis.Zookeeper的方案,其中前两种方案网络上有很多资料可以参考,本文不做展开.我们来看下使用Zookeeper如何实现分布式锁. 什么是Zookeeper? Zookeeper(业界简称zk)是一种提供配置管理.分布式协同以及命名的中心化服务,这些提供的功能都是分布式系统中非常底层且必不可少的基本功能,但是如果自己实现这些功能而且要达到高吞吐.低延迟同时还要保持一致性和可用性,实际上非常困难.因此z

基于ZooKeeper的分布式锁和队列

在分布式系统中,往往需要一些分布式同步原语来做一些协同工作,上一篇文章介绍了Zookeeper的基本原理,本文介绍下基于Zookeeper的Lock和Queue的实现,主要代码都来自Zookeeper的官方recipe. 锁(Lock) 完全分布式锁是全局同步的,这意味着在任何时刻没有两个客户端会同时认为它们都拥有相同的锁,使用 Zookeeper 可以实现分布式锁,需要首先定义一个锁节点(lock root node). 需要获得锁的客户端按照以下步骤来获取锁: 保证锁节点(lock root

日志系统之基于Zookeeper的分布式协同设计

最近这段时间在设计和实现日志系统,在整个日志系统系统中Zookeeper的作用非常重要--它用于协调各个分布式组件并提供必要的配置信息和元数据.这篇文章主要分享一下Zookeeper的使用场景.这里主要涉及到Zookeeper在日志系统中的使用,但其实它在我们的消息总线和搜索模块中也同样非常重要. 日志元数据 日志的类型和日志的字段这里我们统称为日志的元数据.我们构建日志系统的目的最终主要是为了:日志搜索,日志分析.这两大块我们很大程度上依赖于--ElasticSearch(关于什么是Elast

基于ZooKeeper实现分布式锁

ZooKeeper 保证了数据的强一致性,  zk集群中任意节点(一个zkServer)上的相同znode下的数据一定是相同的.使用zookeeper可以非常简单的实现分布式锁, 其基本逻辑如下: 客户端调用create()方法创建名为"locknode/lock"的节点,需要注意的是,这里节点的创建类型需要设置为EPHEMERAL_SEQUENTIAL. 客户端调用getChildren("lock")方法来获取所有已经创建的lock节点的子节点,同时在这个节点上

基于Redis的分布式锁真的安全吗?(下)

自从我写完这个话题的上半部分之后,就感觉头脑中出现了许多细小的声音,久久挥之不去.它们就像是在为了一些鸡毛蒜皮的小事而相互争吵个不停.的确,有关分布式的话题就是这样,琐碎异常,而且每个人说的话听起来似乎都有道理.   今天,我们就继续探讨这个话题的后半部分.本文中,我们将从Antirez反驳Martin Kleppmann的观点开始讲起,然后会涉及到Hacker News上出现的一些讨论内容,接下来我们还会讨论到基于Zookeeper和Chubby的分布式锁是怎样的,并和Redlock进行一些对

基于ZooKeeper的一种简单分布式锁的实现

        ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务.基于ZooKeeper,我们可以实现一种简单的分布式互斥锁,包括可重入与不可重入.代码如下: import java.io.IOException; import java.util.ArrayList; import java.util.Random; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperExcept

用Zookeeper实现分布式锁和选主

Zookeeper可以用来实现Distributed lock(分布式锁)和leader election(选主). 分布式锁和选主虽然用在不同的场景,但是2者的机制是相同的. Zookeeper官方文档上给出了一个recipes,介绍了如何实现分布式锁和选主,2种实现说明的步骤和风格完全不一样,但是本质是一样的. 这里先翻译一下recipes对2者的说明. 获得锁的步骤: 1.调用create(),以"_locknode_/lock-"作为路径名,并且设置sequence和ephem

基于Redis的分布式锁真的安全吗?(上)

网上有关Redis分布式锁的文章可谓多如牛毛了,不信的话你可以拿关键词"Redis 分布式锁"随便到哪个搜索引擎上去搜索一下就知道了.这些文章的思路大体相近,给出的实现算法也看似合乎逻辑,但当我们着手去实现它们的时候,却发现如果你越是仔细推敲,疑虑也就越来越多.   实际上,大概在一年以前,关于Redis分布式锁的安全性问题,在分布式系统专家Martin Kleppmann和Redis的作者antirez之间就发生过一场争论.由于对这个问题一直以来比较关注,所以我前些日子仔细阅读了与这