一种基于mysql实现分布式锁的方式
介绍
分布式锁,即分布式系统中的锁。在单体应用中我们通过锁解决的是控制共享资源访问的问题,而分布式锁,就是解决了分布式系统中控制共享资源访问的问题。与单体应用不同的是,分布式系统中竞争共享资源的最小粒度从线程升级成了进程。
目前比较常见的实现分布式锁的方式主要有三种:基于数据库实现、基于Zookeeper实现、基于redis实现。本文主要介绍了一种基于mysql数据库实现分布式锁的方式,从最简单的实现方式开始,一步步构造一个拥有基本分布式锁条件的程序。
版本1
mysql有两种实现分布式锁的思路,乐观锁和悲观锁,本文利用悲观锁方式实现。
select * from tableName where key = " " for update
在InnoDB下如果key为索引,则会为该行加上排他锁,若其它线程想获得排他锁则会阻塞。
伪代码:
where(true){
select ... for update
if(记录存在)
//业务逻辑
return;
else
inset ...
}
commit;
如上的版本会产生两个问题:
- insert时通过唯一键重复报错,处理错误形式不和
- 由于间隙锁原因,并发插入会引发死锁
版本2
为了解决上一版本的问题,本文引入中央锁的概念,也同时加入了数据库锁表和状态位。
- 创建数据库锁表,插入中央锁记录
-- 锁表,单库单表 CREATE TABLE IF NOT EXISTS credit_card_user_tag_db.t_tag_lock ( -- 记录index Findex INT NOT NULL AUTO_INCREMENT COMMENT '自增索引id', -- 锁信息(key、计数器、过期时间、记录描述) Flock_name VARCHAR(128) DEFAULT '' NOT NULL COMMENT '锁名key值', Fcount INT NOT NULL DEFAULT 0 COMMENT '计数器', Fdeadline DATETIME NOT NULL DEFAULT '1970-01-01 00:00:00' COMMENT '锁过期时间', Fdesc VARCHAR(255) DEFAULT '' NOT NULL COMMENT '值/描述', -- 记录状态及相关事件 Fcreate_time DATETIME NOT NULL DEFAULT '1970-01-01 00:00:00' COMMENT '创建时间', Fmodify_time DATETIME NOT NULL DEFAULT '1970-01-01 00:00:00' COMMENT '修改时间', Fstatus TINYINT NOT NULL DEFAULT 1 COMMENT '记录状态,0:无效,1:有效', -- 主键(PS:总索引数不能超过5) PRIMARY KEY (Findex), -- 唯一约束 UNIQUE KEY uniq_Flock_name(Flock_name), -- 普通索引 KEY idx_Fmodify_time(Fmodify_time) )ENGINE=INNODB DEFAULT CHARSET=UTF8;
- 伪代码:
select from tableName where key = "中央所名字" for update;
select ... fro update;
if(记录存在)
//返回记录
else
insert ...
//返回记录
//判断状态位
//更新状态位
//执行业务逻辑
目前版本存在的一个问题是应用的所有记录共享一个中央锁,并发并不高。
版本3
为了解决上一版本的问题,本文提出了解决办法,即:
- 在数据库锁表中插入多条中央锁数据记录
- 获取中央锁时根据key的哈希值来选择一个中央锁
伪代码:
见文章最后完整代码
经测试,版本3相较于版本2性能提升近一倍。
但是目前还存在两个问题:
- 锁无法 重入
- 没有超时解锁机制
版本4
为了解决版本3中的问题,本文做出以下措施
- 数据库中加入请求ID、过期时间和加锁次数字段
- 获取锁时校验请求ID,若相同则获取到锁且加锁次数+1
- 获取到锁时校验过期时间,若已过期则可以获取到该锁。
完整代码:
static ThreadLocal<String> requestIdTL = new ThreadLocal<>();
public List<TTagLock> selectAll() {
return testMapper.selectAll();
}
/**
** 获取当前线程requestid
** @return
**/
public static String getRequestId() {
String requestId = requestIdTL.get();
if (requestId == null || "".equals(requestId)) {
requestId = UUID.randomUUID().toString();
requestIdTL.set(requestId);
}
System.out.println("requestId: "+requestId);
return requestId;
}
/**
* 初始化记录,如果有记录update,如果没有记录insert
*/
private TTagLock initTTagLock(String key){
// 查询记录是否存在
TTagLock tTagLock = testMapper.queryRecord(key);
if (null == tTagLock) {
// 记录不存在,创建
tTagLock = new TTagLock();
tTagLock.setFlockName(key);
tTagLock.setFcount(0);
tTagLock.setFdesc("");
tTagLock.setFdeadline(new Date(0));
tTagLock.setFstatus(1);
tTagLock.setFRequestId(getRequestId());
testMapper.insertRecord(tTagLock);
}
return tTagLock;
}
/**
* 获取中央锁Key
*/
private boolean getCenterLock(String key){
String prefix = "center_lock_";
CRC32 crc32 = new CRC32();
crc32.update(key.getBytes());
Long hash = crc32.getValue();
if (null == hash){
return false;
}
Integer len = hash.toString().length();
String slot = hash.toString().substring(len-2);
String centerLockKey = prefix + slot;
testMapper.queryRecord(centerLockKey);
return true;
}
/**
* 获取锁,代码片段
*/
@Transactional
public boolean getLock(String lockName,String desc,Long expireTime) throws InterruptedException {
// 检测参数
if(StringUtils.isEmpty(lockName)) {
System.out.println("参数为空");
return false;
}
// 获取中央锁,初始化记录
Long nowTime = new Date().getTime();
getCenterLock(lockName);
TTagLock tTagLock = initTTagLock(lockName);
// 未释放锁或未过期,获取失败
if (tTagLock.getFstatus() == 1
&& tTagLock.getFdeadline().getTime() > nowTime && !getRequestId().equals(tTagLock.getFRequestId())){
Thread.sleep(50);
return false;
}
if(getRequestId().equals(tTagLock.getFRequestId())){//重入锁
int num = testMapper.updateRecord(lockName, tTagLock.getFdeadline(), tTagLock.getFcount()+1,
tTagLock.getFdesc(), 1,getRequestId());
return true;
}
// 获取锁
Date deadline = new Date(nowTime + expireTime);
int num = testMapper.updateRecord(lockName, deadline, 1, desc, 1,getRequestId());
return true;
}
public void unLock(String lockName) {
//获取当前线程requestId
String requestId = getRequestId();
TTagLock tTagLock = testMapper.queryRecord(lockName);
//当前线程requestId和库中request_id一致 && lock_count>0,表示可以释放锁
if (Objects.nonNull(tTagLock) && requestId.equals(tTagLock.getFRequestId()) && tTagLock.getFcount() > 0) {
if (tTagLock.getFcount() == 1) {
//重置锁
resetLock(tTagLock);
} else {
testMapper.updateRecord(lockName,tTagLock.getFdeadline(),tTagLock.getFcount()-1,tTagLock.getFdesc(),1,getRequestId());
}
}
}
public int resetLock(TTagLock tTagLock) {
tTagLock.setFRequestId("");
tTagLock.setFcount(0);
tTagLock.setFdeadline(new Date());
//todo 修改update
return testMapper.updateRecord(tTagLock.getFlockName(),new Date(),0,"",0,"");
}
2023.3.26 下午 13.51分 于杭州