多线程(四 ) Lock的使用及原理
Lock 的使用
我们知道 synchronize 关键字可以实现线程安全,这个是 jvm 层面帮我们实现的线程安全操作。而 lock 是在 jdk 层次实现的线程安全,那么 lock 怎么使用呢。
ReentrantLock
ReentrantLock:可重入锁,支持重入。简单意思上来说,就是当线程 1 拿到锁的时候,线程 1 进入同一把锁的其他方法的时候,可以直接进入,增加重入次数。ReentrantLock 的使用
创建一个 Lock 对象。
调用 lock 方法加锁,下面执行的代码都在同步代码块
调用 unlock 方法,释放锁资源。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30public class ReentrantLockDemo {
private static int count = 0;
static Lock lock = new ReentrantLock();
public static void inc(){
lock.lock();
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
count++;
lock.unlock();
}
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i <1000 ; i++) {
new Thread(() ->inc()).start();
}
Thread.sleep(3000);
System.out.println(count);
}
}ReentrantReadWriteLock
ReentrantReadWriteLock:读写锁,拥有 2 把锁,分别为读锁和写锁。在实际情况中,数据大多数情况下都是读取,采用读锁(乐观锁)可以减少资源消耗。读取数据时使用读锁,更新数据时使用写锁。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28public class ReadWriteLockDemo {
public Map<String,Object> cacheMap = new HashMap<>();
static ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
static Lock read = readWriteLock.readLock();
static Lock writeLock = readWriteLock.writeLock();
public Object get(String key){
read.lock();
try {
return cacheMap.get(key);
} finally {
read.unlock();
}
}
public void put(String key,String value){
writeLock.lock();
try {
cacheMap.put(key,value);
} finally {
writeLock.unlock();
}
}
}我们来看一下 lock 的关系图
ReentrantLock 实现了 Lock 接口,ReentrantLock 中的内部类 Sync 继承了 AbstractQueuedSynchronizer,实现了 AQS(AbstractQueuedSynchronizer 的简称)的基本功能。
JUC 基本理论
JUC:java.util.concurrent java 并发工具包,这个包下面基本上实现了多线程下的各种操作。
CAS:compareaAndSwap 由于编译器优化,指令重排,cpu 调度等一些原因,导致非原子操作不能保证同一个事物执行,而执行 cas 操作能保证连续操作同时执行。cas 操作中有预期值,内存值,更新值,当预期值和内存值相等的时候,讲内存中的数据修改成更新值。
AQS:AbstractQueuedSynchronizer ,同步队列,它是实现线程同步的核心组件。AQS 的功能分为 2 种,独占和共享。独占锁,每次只能一个线程持有锁,比如 ReentrantLock;共享锁,允许多个线程同时获取锁,并发访问共享资源,比如 ReentrantReadWriteLock。AQS 的内部维护了一个 FIFO 的双向链表,AQS 定义了内部类 Node,里面含有后继节点指针和前驱节点指针。每个 Node 其实由线程封装,当线程抢锁失败之后会封装成 node 加入到 AQS 队列中,当获取锁的线程释放锁之后,会从队列中唤醒一个阻塞的节点。
![image-20210311202725114]/images/image-20210311202725114.png)
AQS 队列增加线程及释放线程流程
增加线程
新的线程封装成 node 追加到同步队列,设置 prev 节点,将当前节点的 next 节点指向自己
通过 cas 操作将 tail 节点指向新的 node 节点
释放锁
修改 head 节点指向下一个 node。下一个 node 的 prev 指针指向空
新获取锁的的节点的 next 节点指向空
修改 head 节点指向的时候不需要 cas 操作,因为这个时候是获取锁的线程进行操作,只会当前线程独占,只需要将 head 设置为原首节点的后继节点,并且断开原首节点的 next 引用即可。
ReentrantLock 源码分析
ReentrantLock lock 操作时序图
ReentrantLock 加锁的入口
1 | public void lock() { |
sync 是 ReentrantLock 的内部类,继承了 AQS 来实现重入锁的逻辑。AQS 是一个同步队列,它能够实现线程的阻塞和唤醒,但是它不具备业务功能,所以在不同的同步场景中,会继承 AQS 来实现对应场景的功能
Sync 有两个实现类,分别是:
NonfairSync:非公平锁,不管当前队列是否存在其他线程等待,新线程都会有机会直接抢占资源
FairSync:公平锁,所有线程严格按照 FIFO 队列顺序执行
我们这里以 NonfairSync 为例来追踪程序执行步骤
1 | final void lock() { |
1 | protected final boolean compareAndSetState(int expect, int update) { |
这段 cas 代码的意思是如果当前内存中的 state 值和预期 expect 值相等,就更新成 update 值,否则返回 false。
state 是 AQS 的同步标识,当 state=0 时,表示无锁状态。当 state>0 的时候,表示已经有锁获取了线程,并且由于 ReentrantLock 支持重入,那么每重入一下,state 的值就会+1。在释放锁的时候,需要多次释放 state,知道 state 等于 0.
unsafe 是 java 留下的后门,可以直接进行内存访问、线程的挂起和恢复、CAS、线程同步、内存屏障等,而 CAS 是 unsafe 提供的原子操作。
假如 CAS 操作失败,程序执行 acquire(1)操作
1 | public final void acquire(int arg) { |
这个方法的主要操作如下:
1.调用 tryAcquire 尝试去获取独占锁, 如果成功则返回 true,否则返回 false
2.如果 tryAcquire 失败,则调 addWaiter 方法将当前节点封装成 node 添加到 AQS 队尾
3.调用 acquireQueued 方法通过自旋尝试去获取锁
tryAcquire 方法
1 | protected final boolean tryAcquire(int acquires) { |
1 | final boolean nonfairTryAcquire(int acquires) { |
nonfairTryAcquire 方法的操作主要如下
判断当前是否释放线程资源,如果已经释放,就直接 cas 抢占资源。
判断当前获取锁的线程是否为当前线程,如果是的话,就直接增加重入次数。
假如上一步抢夺资源失败,进入 addWaiter 方法
1 | private Node addWaiter(Node mode) { |
addWaiter 方法的操作如下:
将当前线程封装成 node 节点
判断 AQS 同步队列队尾(tail)指针是否为空,如果不为空就通过 cas 操作将当前 node 加入到 AQS 队尾
enq 方法主要是不断自旋直到将当前 node 节点添加到同步队列
1 | */ |
当 3 个线程同时运行的时候的流程图大致如下
再回到 AQS 的 acquireQueued 方法中
1 | final boolean acquireQueued(final Node node, int arg) { |
这个方法的操作主要如下:
获取当前线程 node 的上一个节点,如果节点是 head,那么久有资格去抢夺资源。
调用 tryAcquire 抢夺资源成功之后,把获得锁的线程 node 改成 head 节点,并且移除原来的 head 节点,
如果抢夺失败,那么根据 waitStatus 判断是否需要挂起线程,否则重复进行自旋。
shouldParkAfterFailedAcquire 方法
如果 ThreadA 没有释放资源,那么 ThreadB 和 ThreadC 来争夺锁肯定会失败,那么会执行 shouldParkAfterFailedAcquire 方法。
Node 中 waitStatus 有 5 种状态
CANCELLED(1):在同步队列中等待超时或者被中断,需要重同步队列中删掉节点
SIGAL(-1):只要前置节点释放锁,SINAL 状态的后续节点就可以抢夺锁
CONDITION(-2):和 condition 有关
PROPAGATE(-2):共享模式下,该状态下的线程处于可运行状态
默认状态 0
1 | private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { |
此时线程状态图如下:
ReentrantLock 释放锁过程
1 | public void unlock() { |
1 | public final boolean release(int arg) { |
tryRelease方法
1 | protected final boolean tryRelease(int releases) { |
unparkSuccessor方法
1 | private void unparkSuccessor(Node node) { |
此时线程状态图如下
condition.await和condition.signal
1 | public class ConditionWait implements Runnable{ |
这里存在2个队列,一个是AQS队列,一个是condition等待队列。
阶段1:当await线程执行的时候,添加到AQS队列,由于此时还没有锁竞争,所以awiat线程直接获取到锁,添加到AQS队列的head节点。然后调用condition.await(),将当前线程从AQS队列移动到condition队列,在自旋一段时间之后,直接park挂起线程。
阶段2:当sinal线程启动的时候,此时AQS队列的线程也是空的,所以可以直接运行同步代码。然后调用condition.signal()将condition队列中的await线程移动到AQS队列,由于此时sinal线程还占用同步锁,所以sinal线程执行完之后,AQS队列的head指针从sinal线程指向了await线程,这个时候await线程继续执行后续代码。
深圳市其域创新科技有限公司