前言
在JVM中,怎么样防止因多线程并发造成的数据不一致或逻辑混乱?大家一定都能想到锁,能想到java.util.concurrent
包下的各种各样的用法。
但在分布式环境下,不同的机器、不同的jvm,那些好用的并发锁工具,并不能满足需要。
所以我们需要有一个分布式锁
,来解决分布式环境下的并发问题。而本文正是在这条道路上,走出的一些经验的总结。
我们按照待解决问题的场景,一步一步看下去。
问题一:如何实现一个分布式锁;
锁,大多是基于一个只能被1个线程
占用、得到的资源来实现的。
JVM的锁,是基于CPU对于寄存器的修改指令cmpxchg
,来保证旧值
改为新值
的操作,只有一个能成功。这里的旧值
,就是这个被争夺的资源
,大多数情况,并发时第一个线程对旧值
修改成功后,其他线程就没有机会了。(当然,ABA是另外一个话题,这里就不说了。。)
所以,在分布式环境下,要实现一个锁,我们就要找个一个只能被1个线程
占用的资源。
有经验的开发很快能想到,共享磁盘文件、缓存、mysql数据库,这些分布式环境下,数据表现为单份
的,都应当能满足需求。
然而,基于文件、DB会遭遇各式各样的问题,性能,经常也会是瓶颈。
因此,我们这里使用的,是淘系用的最多的缓存中间件产品--Tair。
查看com.taobao.tair.TairManager
接口,发现有以下几个接口或许适合使用:
TairManager.incr
加法计数器。首次使用时,返回值能等于默认值,而不被+n
的机会,只有一个。貌似可以用。TairManager.lock
看起来名字像是这个意思,姑且拿来一试。TairManager.put
传入version版本进行校验,cas原则会保证只有一个能成功。貌似可以用。
(注:mdb有可能丢、且invalid时不保证跨机房一致性,所以这个锁肯定需要用ldb来实现的
。)
在线上多机房情况下,做了一下测试,测试程序核心代码大约是:
void test(){ // 2个机房的jvm实例,每个实例n个线程同时执行本方法;
while(true){
if(tryLock()){ // tryLock\unLock 的实现对应有3套;
try{
logger.info("Got lock! Hostname:{} ThreadName:{}",getHostname(),Thread.currentThread().getName());
Thread.sleep(1000);
}finally {
unLock();
logger.info("Release lock! Hostname:{} ThreadName:{}",getHostname(),Thread.currentThread().getName());
}
}
}
}
测试的结论如下:
TairManager.incr
初始几次锁的获取和释放没有问题,但是后来返回值很大,就谁也拿不到锁;怀疑和接口超时、或invalid机制有关;- 补充: 后面发现incr做锁的一种可靠方案是,使用值限定参数:
int lowBound
,int upperBound
比如:lowBound=0, upperBound=1,则value一直在0与1之间切换,用过多次,还是很靠谱的。
- 补充: 后面发现incr做锁的一种可靠方案是,使用值限定参数:
TairManager.lock
完全不行,lock接口看来根本不是做这个用的;真正的使用场景是锁住一个key不容许更新
,不是锁机制。0
TairManager.put
非常稳定、靠谱;有个现象值得关注:A机房invalid之后,B机房会先拿到锁;因为invalid先从远程机房开始;
最后,给出ldb put
实现的分布式锁的核心代码(__后面都基于ldb put来实现__):
public boolean trylock(String key) {
ResultCode code = ldbTairManager.put(NAMESPACE, key, "This is a Lock.", 2, 0);
if (ResultCode.SUCCESS.equals(code))
return true;
else
return false;
}
public boolean unlock(String key) {
ldbTairManager.invalid(NAMESPACE, key);
}
问题二:lock之后,程序发布\或者进程crash,trylock就永远false了;
每次发布,总发现有些异常数据,拿不到锁,不能继续向前走;
仔细分析,原来tair的lock,一直没能释放;
要解决这个问题,可以先不管原因,无脑的给tair put加上超时时间
就行,这样业务至少可以自行恢复。
但是,这个超时时间需要仔细考虑
把握,需要在业务承受范围之内。
注: 像程序发布、进程crash,这种情况,是无可避免的让锁没机会释放。还有其他可能性,大多是bug了。。
public boolean trylock(String key, int timeout) {
ResultCode code = ldbTairManager.put(NAMESPACE, key, "This is a Lock.", 2, timeout);
if (ResultCode.SUCCESS.equals(code))
return true;
else
return false;
}
问题二:tair存在让人苦恼的超时问题,即使千分之1,本业务有时也不能容忍
tair的大神给的回复很简单:超时请重试
;
仔细想一下,put超时
分两种情况:
- 上一次put其实已经成功了;那么重试肯定会失败;
- 上一次put其实失败了;那么若这一次顺利,就会成功;
那如何解决上面的第1点呢?
其实,锁应该是能够经得起复查
的(类似偏向锁):A拿到的锁,没有unlock之前,无论A重试检查多少次,都是A的!
怎么实现?
既然用的是ldb缓存,它是key-value
结构的,前面version控制等,都只用到了key。
这里,我们可以从tair value里做文章:让value包含机器ip+线程name,trylock内先get value做检查
于是,实现变为:
public boolean trylock(String key, int timeout) {
Result<DataEntry> result = locker.ldbTairManager.get(NAMESPACE, lockKey);
if (result == null) return null;
if (ResultCode.DATANOTEXSITS.equals(result.getRc())) { // means lock is free
ResultCode code = ldbTairManager.put(NAMESPACE, key, getLockValue(), 2, timeout);
if (ResultCode.SUCCESS.equals(code))
return true;
}else if(result.getValue() != null && getLockValue().equals(result.getValue().getValue())){
return true;
}
return false;
}
private String getLockValue(){
return Utils.getHostname() + ":" + Thread.currentThread().getName();
}
注意: 其实这里线程复用时,ThreadName有相同风险,可以改为uuid逻辑,需复用锁时传入uuid。
public boolean trylock(String key, int timeout, String uuid){ ... }
问题三:新方案其实多了一次get操作,若是get也超时怎么办?
超时无法避免,还是要靠重试!(前提是逻辑可以重试)
public boolean trylock(String key, int timeout) {
Result<DataEntry> result = locker.ldbTairManager.get(NAMESPACE, key);
if (result == null || ResultCode.CONNERROR.equals(result.getRc()) || ResultCode.TIMEOUT.equals(result.getRc()) || ResultCode.UNKNOW.equals(result.getRc())) // get timeout retry case
result = locker.ldbTairManager.get(NAMESPACE, key);
if (result == null || ResultCode.CONNERROR.equals(result.getRc()) || ResultCode.TIMEOUT.equals(result.getRc()) || ResultCode.UNKNOW.equals(result.getRc())){ // 还是超时,则留下日志痕迹
logger.error("ldb tair get timeout. key:{}.",key);
return false;
}
if (ResultCode.DATANOTEXSITS.equals(result.getRc())) { // means lock is free
ResultCode code = ldbTairManager.put(NAMESPACE, key, getLockValue(), 2, timeout);
if (ResultCode.SUCCESS.equals(code))
return true;
else if(code==null || ResultCode.CONNERROR.equals(result.getRc()) || ResultCode.TIMEOUT.equals(result.getRc()) || ResultCode.UNKNOW.equals(result.getRc())){ // put timeout retry case
return trylock(key, timeout); // 递归尝试
}
}else if(result.getValue() != null && getLockValue().equals(result.getValue().getValue())){
return true;
}
return false;
}
// 【注意】其实这里线程复用时,ThreadName有相同风险,可以改为uuid逻辑,复用锁传入uuid。
private String getLockValue(){
return Utils.getHostname() + ":" + Thread.currentThread().getName();
}
进一步的,我们还可以对get/put的retry做次数控制;
真实线上的情况,一般一次retry就能解决问题,次数多了,反而可能导致雪崩,需要慎重;
多一次Get的性能影响
有代码洁癖、性能洁癖的人可能会想:普通tair锁,一次put就能搞定,这里却要先get再put,浪费啊。。。
这里梳理一下:
- 若是
锁已经被持有
,那么get之后,麻烦发现被持有,直接返回失败;这时,并不会再次put,开销是一样的;(甚至get的开销,比put要小,至少不会占用put的限流阈值) - 若是
没人持有锁
,确实这时get有些浪费的,但是为了锁可以复查
这个特性(可重试)、为了能解决超时这个问题,我认为还是值得的。在实际场景中,开发者自己可以评估是否需要。比如:拿前面的uuid的样例API讲,若不需要这个特性时,就不传入uuid,那么实现代码里,可以自动降级为只有一个put的锁实现;
问题四:批量锁
批量锁,主要注意拿锁的顺序和释放锁相反
,伪代码如下:
if(trylock("A") && trylock("B") && trylock("C")){
try{
// do something
}finally{ // 注意这里的顺序要反过来
unlock("C");
unlock("B");
unlock("A");
}
}
最后总结下,给一个完整代码 :smile:
import com.taobao.tair.DataEntry;
import com.taobao.tair.Result;
import com.taobao.tair.ResultCode;
import com.taobao.tair.TairManager;
import org.apache.commons.lang.NotImplementedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.helpers.FormattingTuple;
import org.slf4j.helpers.MessageFormatter;
import javax.annotation.Resource;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
public class CommonLocker {
private static final Logger logger = LoggerFactory.getLogger(CommonLocker.class);
@Resource
private TairManager ldbTairManager;
private static final short NAMESPACE = 1310;
private static CommonLocker locker;
public void init() {
if (locker != null) return;
synchronized (CommonLocker.class) {
if (locker == null)
locker = this;
}
}
public static Lock newLock(String format, Object... argArray) {
FormattingTuple ft = MessageFormatter.arrayFormat(format, argArray);
return newLock(ft.getMessage());
}
public static Lock newLock(String strKey) {
String key = "_tl_" + strKey;
return new TairLock(key, CommonConfig.lock_default_timeout);
}
public static Lock newLock(String strKey, int timeout) {
String key = "_tl_" + strKey;
return new TairLock(key, timeout);
}
private static class TairLock implements Lock {
private String lockKey;
private boolean gotLock = false;
private int retryGet = 0;
private int retryPut = 0;
private int timeout;
public TairLock(String key, int timeout) {
this.lockKey = tokey(key);
this.timeout = timeout;
}
public boolean tryLock() {
return tryLock(timeout);
}
/**
* need finally do unlock
*
* @return
*/
public boolean tryLock(int timeout) {
Result<DataEntry> result = locker.ldbTairManager.get(NAMESPACE, lockKey);
while (retryGet++ < CommonConfig.lock_get_max_retry &&
(result == null || ResultCode.CONNERROR.equals(result.getRc()) || ResultCode.TIMEOUT.equals(result.getRc()) || ResultCode.UNKNOW.equals(result.getRc()))) // 重试一次
result = locker.ldbTairManager.get(NAMESPACE, lockKey);
if (ResultCode.DATANOTEXSITS.equals(result.getRc())) { // lock is free
// 已验证version 2表示为空,若不是为空,则返回version error
ResultCode code = locker.ldbTairManager.put(NAMESPACE, lockKey, locker.getValue(), 2, timeout);
if (ResultCode.SUCCESS.equals(code)) {
gotLock = true;
return true;
} else if (retryPut++ < CommonConfig.lock_put_max_retry &&
(code == null || ResultCode.CONNERROR.equals(result.getRc()) || ResultCode.TIMEOUT.equals(result.getRc()) || ResultCode.UNKNOW.equals(result.getRc()))) {
return tryLock(timeout);
}
} else if (result.getValue() != null && locker.getValue().equals(result.getValue().getValue())) {
// 【注意】其实这里线程复用时,ThreadName有相同风险,可以改为uuid逻辑,复用锁传入uuid。
// 若是自己的锁,自己继续用
gotLock = true;
return true;
}
// 到这里表示没有拿到锁
return false;
}
public void unlock() {
if (gotLock) {
ResultCode invalidCode = locker.ldbTairManager.invalid(NAMESPACE, lockKey);
gotLock = false;
}
}
public void lock() {
throw new NotImplementedException();
}
public void lockInterruptibly() throws InterruptedException {
throw new NotImplementedException();
}
public boolean tryLock(long l, TimeUnit timeUnit) throws InterruptedException {
throw new NotImplementedException();
}
public Condition newCondition() {
throw new NotImplementedException();
}
}
// 【注意】其实这里线程复用时,ThreadName有相同风险,可以改为uuid逻辑,复用锁传入uuid。
private String getValue() {
return getHostname() + ":" + Thread.currentThread().getName();
}
/**
* 获得机器名
*
* @return
*/
public static String getHostname() {
try {
return InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
return "[unknown]";
}
}
public void setLdbTairManager(TairManager ldbTairManager) {
this.ldbTairManager = ldbTairManager;
}
}
使用样例
Lock lockA = CommonLocker.newLock("hs_room_{}_uid_{}", roomDo.getUuid(), roomDo.getMaster().getUid());
Lock lockB = CommonLocker.newLock("hs_room_{}_uid_{}", roomDo.getUuid(), roomDo.getPartnerList().get(0).getUid());
try {
if (lockA.tryLock() && lockB.tryLock()) {// 分布式锁定本任务
// do something....
}
} finally {
lockB.unlock();
lockA.unlock();
}
祝大家用的愉快,Happy Coding!!