HBase行锁原理及实现

        请带着如下问题阅读本文。

       1、什么是行锁?

       2、HBase行锁的原理是什么?

       3、HBase行锁是如何实现的?

       4、HBase行锁是如何应用的?

        一、什么是行锁?

        我们知道,数据库中存在事务的概念。事务是作为单个逻辑工作单元执行的一系列操作,要么完全地执行,要么完全的不执行。而事务的四大特点即原子性、一致性、分离性和持久性。其中,原子性首当其冲,那么在HBase内部实现其原子性的重要保证是什么呢?答案就是行锁。

        什么是行锁呢?顾名思义,它就是加在行上的一把锁。在它未释放该行前,最起码其他访问者是无法对该行做修改的,即要修改的话,必须得获得该行的锁才能拥有修改改行数据的权限,这就是行锁的含义。

        二、HBase行锁实现原理

        HBase行锁是利用Java并发包concurrent里的CountDownLatch(1)来实现的。它的主要思想就是在服务器端每个访问者单独一个数据处理线程,每个处理线程针对特定行数据修改时必须获得该行的行锁,而其他客户端线程想要修改数据的话,必须等待前面的线程释放锁后才被允许,这就利用了Java并发包中的CountDownLatch,CountDownLatch为Java中的一个同步辅助类,在完成一组正在其他线程中进行的操作之前,它允许一个或多个线程一直等待。这里,将线程数设置为1,十分巧妙的实现了独占锁的概念。

        三、HBase行锁的实现

        HBase的行锁主要是通过HRegion的两个内部类实现的,其中一个是RowLock,另外一个是RowLockContext。

        我们首先看RowLock这个类,其定义如下:

/**
   * Row lock held by a given thread.
   * One thread may acquire multiple locks on the same row simultaneously.
   * The locks must be released by calling release() from the same thread.
   *
   * 给定线程持有的行锁。
   * 一个线程可以同时获得同一行上的多个锁。
   * 锁必须被相同线程,通过调用release()释放。
   */
  public static class RowLock {
	// 行锁上下文,持有锁定的行row、锁持有者线程thread、该行上锁的数目lockCount等内容
    @VisibleForTesting final RowLockContext context;
    // 行锁是否被释放
    private boolean released = false;

    // 构造函数
    @VisibleForTesting RowLock(RowLockContext context) {
      this.context = context;
    }

    /**
     * Release the given lock.  If there are no remaining locks held by the current thread
     * then unlock the row and allow other threads to acquire the lock.
     * 释放给定的锁。如果当前线程不再持有任何锁,那么对该行解锁并允许其他线程获得锁。
     * @throws IllegalArgumentException if called by a different thread than the lock owning thread
     */
    public void release() {
      if (!released) {
        context.releaseLock();
        released = true;
      }
    }
  }

        通过上述源码我们可以看到,行锁RowLock有两个成员变量,RowLockContext类型的行锁上下文context和布尔类型的行锁是否释放released。其中,行锁上下文context持有锁定的行row、锁持有者线程thread、该行上锁的数目lockCount等内容,并且,利用java的concurrent并发包里的CountDownLatch(1)实现了线程对对象的独占锁。

        RowLockContext的源码如下:

  // 行锁上下文,包括指定的行row,同步计数器latch,锁的数目lockCount和线程thread
  @VisibleForTesting class RowLockContext {
    private final HashedBytes row;// 行
    private final CountDownLatch latch = new CountDownLatch(1);//
    private final Thread thread;
    private int lockCount = 0;

    // 构造方法
    RowLockContext(HashedBytes row) {
      this.row = row;
      this.thread = Thread.currentThread();
    }

    // 判断是否为当前线程对应的行锁上下文
    boolean ownedByCurrentThread() {
      return thread == Thread.currentThread();
    }

    RowLock newLock() {
      lockCount++;
      return new RowLock(this);
    }

    void releaseLock() {
      if (!ownedByCurrentThread()) {
        throw new IllegalArgumentException("Lock held by thread: " + thread
          + " cannot be released by different thread: " + Thread.currentThread());
      }
      lockCount--;
      if (lockCount == 0) {
        // no remaining locks by the thread, unlock and allow other threads to access
        RowLockContext existingContext = lockedRows.remove(row);
        if (existingContext != this) {
          throw new RuntimeException(
              "Internal row lock state inconsistent, should not happen, row: " + row);
        }

        // 同步计数器减1
        latch.countDown();
      }
    }
  }

        通过源码我们可以看到,行锁的上下文信息,主要包括行锁对应的行row以及占用该行锁的线程thread。构造RowContext时,只需传入行row即可,占用的线程则通过Thread.currentThread()获得当前线程。

       新加锁时,通过调用newLock()方法即可实现,首先锁的计数器lockCount加1,然后返回由当前RowContext构造RowLock实例即可。

       释放锁时,通过调用releaseLock()方法即可实现,首先通过ownedByCurrentThread()方法确保调用releaseLock()方法的当前线程是否和RowContext持有的线程一致,然后,锁的计数器lockCount减1,并且,如果lockCount为0的话,说明不再有操作占用该行,将row对应的行锁从数据结构lockedRows中删除,允许其他线程获得该行的行锁,最后,最重要的一步,latch.countDown(),就可完成行锁的释放了。

        四、HBase行锁的使用

        下面,我们看下HBase行锁的使用。在涉及数据变更的操作,比如Put、Delete等中,在对一行数据操作之前,都会调用getRowLockInternal()方法,获得该行数据的行锁。代码如下:

/**
   * A version of getRowLock(byte[], boolean) to use when a region operation has already been
   * started (the calling thread has already acquired the region-close-guard lock).
   */
  protected RowLock getRowLockInternal(byte[] row, boolean waitForLock) throws IOException {

	// 构造HashedBytes类型表示的行,rowKey
	HashedBytes rowKey = new HashedBytes(row);
	// 创建行锁上下文实例,并制定为行rowkey和当前线程拥有
    RowLockContext rowLockContext = new RowLockContext(rowKey);

    // loop until we acquire the row lock (unless !waitForLock)
    while (true) {
      // 将rowkey与行锁上下文的对应关系添加到Region的数据结构lockedRows中
      RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext);
      if (existingContext == null) {
        // Row is not already locked by any thread, use newly created context.
    	// 该行已经没有被任何线程锁住,使用这个新创建的上下文
        break;
      } else if (existingContext.ownedByCurrentThread()) {
        // Row is already locked by current thread, reuse existing context instead.
    	// 该行已经被当前线程锁住,复用当前线程之前创建的行锁上下文实例
        rowLockContext = existingContext;
        break;
      } else {
    	// 该行被其他线程锁住,如果不需要等待锁,直接返回null
        if (!waitForLock) {
          return null;
        }

        TraceScope traceScope = null;
        try {
          if (Trace.isTracing()) {
            traceScope = Trace.startSpan("HRegion.getRowLockInternal");
          }
          // Row is already locked by some other thread, give up or wait for it
          // 行已经被其他线程锁住,放弃或者等待
          // 等待rowLockWaitDuration时间后,如果还未获得行锁,直接抛出异常
          if (!existingContext.latch.await(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) {
            if(traceScope != null) {
              traceScope.getSpan().addTimelineAnnotation("Failed to get row lock");
            }
            throw new IOException("Timed out waiting for lock for row: " + rowKey);
          }
          if (traceScope != null) traceScope.close();
          traceScope = null;
        } catch (InterruptedException ie) {
          LOG.warn("Thread interrupted waiting for lock on row: " + rowKey);
          InterruptedIOException iie = new InterruptedIOException();
          iie.initCause(ie);
          throw iie;
        } finally {
          if (traceScope != null) traceScope.close();
        }
      }
    }

    // allocate new lock for this thread
    return rowLockContext.newLock();
  }

        具体流程整理如下:

        1、利用byte[]类型的入参row构造HashedBytes类型表示的行,即rowKey;
        2、利用rowKey创建行锁上下文实例,并指定为行rowKey和当前线程拥有;
        3、循环:
              3.1、将rowKey与行锁上下文的对应关系添加到Region的数据结构lockedRows中,可能出现以下几种情况:
                   3.1.1、如果lockedRows中之前不存在对应行rowKey的数据,说明该行当前没有被任何线程锁住,使用这个新创建的上下文rowLockContext,跳出循环并返回,说   明当前行可用;
                   3.1.2、如果该行已经被当前线程锁住,复用当前线程之前创建的行锁上下文实例,并赋值给rowLockContext,跳出循环并返回,说明当前行可用;
                   3.1.3、如果该行被其他线程锁住,如果入参确定不需要等待锁的获取,直接返回null,否则重复循环,直到等待rowLockWaitDuration时间后,如果还未获得行锁,   直接抛出异常。

        至于都是哪些地方需要获取行锁,在以后各种数据读写流程中再做分析吧~

时间: 2024-10-26 22:43:43

HBase行锁原理及实现的相关文章

《HBase权威指南》一3.4 行锁

3.4 行锁 像put().delete().checkAndPut()这样的修改操作是独立执行的,这意味着在一个串行方式的执行中,对于每一行必须保证行级别的操作是原子性的.region服务器提供了一个行锁(row lock)的特性,这个特性保证了只有一个客户端能获取一行数据相应的锁,同时对该行进行修改.在实践中,大部分客户端应用程序都没有提供显式的锁,而是使用这个机制来保障每个操作的独立性. 文字用户应该尽可能地避免使用行锁.就像在RDBMS中,两个客户端很可能在拥有对方要请求的锁时,又同时请

数据库事务系列-HBase行级事务模型

HBase是BigTable的开源实现,事务模型也与BigTable一脉相承 – 仅支持行级别的事务.虽然Jeff Dean大神在接受采访时公开承认目前在技术领域最后悔的事情就是没有在BigTable中加入跨行事务模型,以至于之后很多团队都在BigTable之上重复造各种各样的分布式事务轮子.这点笔者是认同的,现在确实有很多团队在HBase之上造了很多轮子(Tephra | Trafodian | Omid),试想如果这个工作做在了BigTable里面,这些团队的人是不是可以做更多其他有意义的事

【赠书】拨云见日 - 深入解析Oracle TX行锁(下)

前文中我们详细介绍了TX行锁的概念,危害以及应对方案,并通过双11的一个经典案例进行了解读.今天我们分享另外一个跟TX行锁有关的案例. 案例描述 技术层面: 1.2017年某天,从当天大约10:30开始,A库上出现持续不断的严重行锁. 2.最初应用开发方的处理方式是不断杀会话(且并未通知我方),然而锁的问题一直持续到晚上还存在. 业务层面: 1.选号开户业务受到严重影响,成功开户的业务量仅为平常时刻的30%都不到. 2.业务群各地州市均有反馈该业务处理慢. 首先我们选取故障发生前1小时到处理前最

RDS for MySQL InnoDB 行锁等待和锁等待超时的处理

RDS for MySQL InnoDB 行锁等待和锁等待超时的处理   1. InnoDB 引擎表行锁等待和等待超时发生的场景 2.InnoDB 引擎行锁等待情况的处理 2.1 InnoDB 行锁等待超时参数 innodb_lock_wait_timeout 2.2 大量行锁等待和行锁等待超时的处理 1. InnoDB 引擎表行锁等待和等待超时发生的场景 当一个 RDS for MySQL 连接会话等待另外一个会话持有的互斥行锁时,会发生 InnoDB 引擎表行锁等待情况. 通常情况下,持有该

mysql-MYSQL INNODB表锁和行锁的问题

问题描述 MYSQL INNODB表锁和行锁的问题 Id是主键.以下语句分别是行锁还是表锁? 第一句:update Table set X=1 where Id IN (1,2,4,7); 第二句:update Table set X=1 where Id Between 1 AND 10; 第三句:update Table set X=1 where Id>=1 AND Id<100; 解决方案 这些都是行锁,只有lock table语句innodb才会申请表锁

c#+oracle数据库行锁写法问题

问题描述 c#+oracle数据库行锁写法问题 请教各位高手: 我在c#+oracle,里面,想如此操作.当修改某一数据的前,先执行select行锁定,待修改完毕后再解锁.请问在不用存储过程的情况下,程序该如何写呢? 解决方案 最简单的是使用事务.http://www.cnblogs.com/yanghucheng/archive/2013/01/25/2876492.htmlhttp://happypigs.iteye.com/blog/1576282 解决方案二: 谢谢,能举个实际例子吗 解

安卓软件-QQ设备锁原理获取设备哪些信息

问题描述 QQ设备锁原理获取设备哪些信息 QQ设备锁是根据手机的什么来确定是否为已验证手机,可以登录的.比如说可能获取设备的,IME,机型,等等,具体有哪些 解决方案 机型,串号,Android ID,wifi网卡mac地址,版本,分辨率,内置存储空间大小(/system和/data的大小),CPU版本,CPU核心数,CPU型号,内存RAM大小,基带版本,,,,,,,等等 解决方案二: 获取设备信息获取设备信息

SQL Server2000使用行锁解决并发库存为负超卖问题

假设库存表结构及数据如下:  代码如下 复制代码 create table Stock (     Id int identity(1,1) primary key,     Name nvarchar(50),     Quantity int ) --insert insert into Stock select 'orange',10 在秒杀等高并发情况下,使用下面的存储过程会导致库存为负产生超卖问题:  代码如下 复制代码 create procedure [dbo].Sale    

PostgreSQL 使用advisory lock或skip locked消除行锁冲突, 提高几十倍并发更新效率

PostgreSQL 使用advisory lock或skip locked消除行锁冲突, 提高几十倍并发更新效率 作者 digoal 日期 2016-10-18 标签 PostgreSQL , advisory lock , 高并发更新 背景 通常在数据库中最小粒度的锁是行锁,当一个事务正在更新某条记录时,另一个事务如果要更新同一条记录(或者申请这一条记录的锁),则必须等待锁释放. 通常持锁的时间需要保持到事务结束,也就是说,如果一个长事务持有了某条记录的锁,其他会话要持有这条记录的锁,可能要