Java AQS中ReentrantLock条件锁的使用

一.什么是AQS

1.定义

java.util.concurrent包中的大多数同步器实现都是围绕着共同的基础行为,比如等待队列、条件队列、独占获取、共享获取等,而这些行为的抽象就是基于AbstractQueuedSynchronizer(简称AQS)实现的,AQS是一个抽象同步框架,可以用来实现一个依赖状态的同步器。

JDK中提供的大多数的同步器如Lock, Latch, Barrier等,都是基于AQS框架来实现的。

  • 一般都是通过一个内部类sync继承AQS
  • 将同步器所有调用都同步到Sync对应的方法

2.特性

  • 阻塞等待队列
  • 共享/独占
  • 公平/非公平
  • 可重入
  • 允许中断

3.属性

内部维护属性volatile int state,表示资源的可用状态

  • getState()
  • setState()
  • compareAndSetState()

4.资源共享方式

  • Exclusive-独占,只有一个线程能执行,如ReentrantLock
  • Share-共享,多个线程可以同时执行,如Semaphore/CountDownLatch

5.两种队列

  • 同步等待队列: 主要用于维护获取锁失败时入队的线程
  • 条件等待队列: 调用await()的时候会释放锁,然后线程会加入到条件队列,调用signal()唤醒的时候会把条件队列中的线程节点移动到同步队列中,等待再次获得锁

6.队列节点状态

  • 值为0,初始化状态,表示当前节点在sync队列中,等待着获取锁。
  • CANCELLED,值为1,表示当前的线程被取消;
  • SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark;
  • CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中;
  • PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行;

7.实现方法

自定义同步器实现时主要实现以下几种方法:

  • isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
  • tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
  • tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
  • tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
  • tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。

二.等待队列

1.同步等待队列

AQS当中的同步等待队列也称CLH队列,CLH队列是Craig、Landin、Hagersten三人发明的一种基于双向链表数据结构的队列,是FIFO先进先出线程等待队列,Java中的CLH队列是原CLH队列的一个变种,线程由原自旋机制改为阻塞机制。

AQS 依赖CLH同步队列来完成同步状态的管理:

  • 当前线程如果获取同步状态失败时,AQS则会将当前线程已经等待状态等信息构造成一个节点(Node)并将其加入到CLH同步队列,同时会阻塞当前线程
  • 当同步状态释放时,会把首节点唤醒(公平锁),使其再次尝试获取同步状态。
  • 通过signal或signalAll将条件队列中的节点转移到同步队列。(由条件队列转化为同步队列)

2.条件等待队列

AQS中条件队列是使用单向列表保存的,用nextWaiter来连接:

  • 调用await方法阻塞线程;
  • 当前线程存在于同步队列的头结点,调用await方法进行阻塞(从同步队列转化到条件队列)

三.condition接口

  • 调用Condition#await方法会释放当前持有的锁,然后阻塞当前线程,同时向Condition队列尾部添加一个节点,所以调用Condition#await方法的时候必须持有锁。
  • 调用Condition#signal方法会将Condition队列的首节点移动到阻塞队列尾部,然后唤醒因调用Condition#await方法而阻塞的线程(唤醒之后这个线程就可以去竞争锁了),所以调用Condition#signal方法的时候必须持有锁,持有锁的线程唤醒被因调用Condition#await方法而阻塞的线程。
public static void main(String[] args) {
 Lock lock = new ReentrantLock();
 Condition condition = lock.newCondition();
 new Thread(() -> {
 lock.lock();
 try {
 log.debug(Thread.currentThread().getName() + " 开始处理任务");
 //会释放当前持有的锁,然后阻塞当前线程
 condition.await();
 log.debug(Thread.currentThread().getName() + " 结束处理任务");
 } catch (InterruptedException e) {
 e.printStackTrace();
 } finally {
 lock.unlock();
 }
 }).start();
 new Thread(() -> {
 lock.lock();
 try {
 log.debug(Thread.currentThread().getName() + " 开始处理任务");
 Thread.sleep(2000);
 //唤醒因调用Condition#await方法而阻塞的线程
 condition.signal();
 log.debug(Thread.currentThread().getName() + " 结束处理任务");
 } catch (Exception e) {
 e.printStackTrace();
 } finally {
 lock.unlock();
 }
 }).start();
 }

Thread-0 开始处理任务

Thread-1 开始处理任务

Thread-1 结束处理任务

Thread-0 结束处理任务

四.ReentrantLock

1.ReentrantLock是什么

ReentrantLock是一种基于AQS框架的应用实现,是JDK中的一种线程并发访问的同步手段,它的功能类似于synchronized是一种互斥锁,可以保证线程安全。

2.特点

  • 可中断
  • 可以设置超时时间
  • 可以设置为公平锁
  • 支持多个条件变量
  • 与 synchronized 一样,都支持可重入

3. ReentrantLock和synchronized的区别

  • synchronized是JVM层次的锁实现,ReentrantLock是JDK层次的锁实现;
  • synchronized的锁状态是无法在代码中直接判断的,但是ReentrantLock可以通过ReentrantLock#isLocked判断;
  • synchronized是非公平锁,ReentrantLock可以是公平也可以是非公平的,默认是非公平的;
  • synchronized是不可以被中断的,而ReentrantLock#lockInterruptibly方法是可以被中断的;
  • 在发生异常时synchronized会自动释放锁,而ReentrantLock需要开发者在finally块中显示释放锁;
  • ReentrantLock获取锁的形式有多种:如立即返回是否成功的tryLock(),以及等待指定时长的获取,更加灵活;
  • synchronized在特定的情况下对于已经在等待的线程是后来的线程先获得锁(回顾一下sychronized的唤醒策略),而ReentrantLock对于已经在等待的线程是先来的线程先获得锁;

4. ReentrantLock的使用

伪代码:

ReentrantLock lock = new ReentrantLock(); //参数默认false,不公平锁 
ReentrantLock lock = new ReentrantLock(true); //公平锁 
//加锁 
lock.lock(); 
try { 
 //临界区 
} finally { 
 // 解锁 
 lock.unlock();

例子:基本使用

private static int sum = 0;
 private static Lock lock = new ReentrantLock();
 public static void main(String[] args) throws InterruptedException {
 for (int i = 0; i < 3; i++) {
 Thread thread = new Thread(()->{
 //加锁 一般写在try前面
 lock.lock();
 try {
 // 临界区代码 业务逻辑
 for (int j = 0; j < 10000; j++) {
 sum++;
 }
 } finally {
 // 解锁
 lock.unlock();
 }
 });
 thread.start();
 }
 Thread.sleep(2000);
 System.out.println(sum);
 }

30000

可重入

public static ReentrantLock lock = new ReentrantLock();
 public static void main(String[] args) {
 method1();
 }
 public static void method1() {
 lock.lock();
 try {
 log.debug("execute method1");
 method2();
 } finally {
 lock.unlock();
 }
 }
 public static void method2() {
 lock.lock();
 try {
 log.debug("execute method2");
 method3();
 } finally {
 lock.unlock();
 }
 }
 public static void method3() {
 lock.lock();
 try {
 log.debug("execute method3");
 } finally {
 lock.unlock();
 }
 }

execute method1

execute method2

execute method3

可中断

public static void main(String[] args) throws InterruptedException {
 ReentrantLock lock = new ReentrantLock();
 Thread t1 = new Thread(() -> {
 log.debug("t1启动...");
 try {
 lock.lockInterruptibly();
 try {
 log.debug("t1获得了锁");
 } finally {
 lock.unlock();
 }
 } catch (InterruptedException e) {
 e.printStackTrace();
 log.debug("t1等锁的过程中被中断");
 }
 }, "t1");
 lock.lock();
 try {
 log.debug("main线程获得了锁");
 t1.start();
 //先让线程t1执行
 Thread.sleep(1000);
 t1.interrupt();
 log.debug("线程t1执行中断");
 } finally {
 lock.unlock();
 }
 }

main线程获得了锁

t1启动…

线程t1执行中断

t1等锁的过程中被中断

锁超时

public static void main(String[] args) throws InterruptedException {
 ReentrantLock lock = new ReentrantLock();
 Thread t1 = new Thread(() -> {
 log.debug("t1启动...");
 try {
 //if (!lock.tryLock()) {
 // log.debug("t1获取锁失败,立即返回false");
 // return;
 //}
 	if (!lock.tryLock(1, TimeUnit.SECONDS)) {
 log.debug("等待 1s 后获取锁失败,返回");
 return;
 }
 } catch (Exception e) {
 e.printStackTrace();
 return;
 }
 try {
 log.debug("t1获得了锁");
 } finally {
 lock.unlock();
 }
 }, "t1");
 lock.lock();
 try {
 log.debug("main线程获得了锁");
 t1.start();
 //先让线程t1执行
 Thread.sleep(2000);
 } finally {
 lock.unlock();
 }
 }

main线程获得了锁

t1启动…

等待 1s 后获取锁失败,返回

公平锁和非公平锁

public static void main(String[] args) throws InterruptedException {
// ReentrantLock lock = new ReentrantLock(true); //公平锁
 ReentrantLock lock = new ReentrantLock(); //非公平锁
 for (int i = 0; i < 500; i++) {
 new Thread(() -> {
 lock.lock();
 try {
 try {
 Thread.sleep(10);
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 log.debug(Thread.currentThread().getName() + " running...");
 } finally {
 lock.unlock();
 }
 }, "t" + i).start();
 }
 // 1s 之后去争抢锁
 Thread.sleep(1000);
 for (int i = 0; i < 500; i++) {
 new Thread(() -> {
 lock.lock();
 try {
 log.debug(Thread.currentThread().getName() + " running...");
 } finally {
 lock.unlock();
 }
 }, "强行插入" + i).start();
 }
 }

条件变量

private static ReentrantLock lock = new ReentrantLock();
 private static Condition cigCon = lock.newCondition();
 private static Condition takeCon = lock.newCondition();
 private static boolean hashcig = false;
 private static boolean hastakeout = false;
 //送烟
 public void cigratee(){
 lock.lock();
 try {
 while(!hashcig){
 try {
 log.debug("没有烟,歇一会");
 cigCon.await();
 }catch (Exception e){
 e.printStackTrace();
 }
 }
 log.debug("有烟了,干活");
 }finally {
 lock.unlock();
 }
 }
 //送外卖
 public void takeout(){
 lock.lock();
 try {
 while(!hastakeout){
 try {
 log.debug("没有饭,歇一会");
 takeCon.await();

 }catch (Exception e){
 e.printStackTrace();
 }
 }
 log.debug("有饭了,干活");
 }finally {
 lock.unlock();
 }
 }
 public static void main(String[] args) {
 ReentrantLockDemo6 test = new ReentrantLockDemo6();
 new Thread(() ->{
 test.cigratee();
 }).start();
 new Thread(() -> {
 test.takeout();
 }).start();
 new Thread(() ->{
 lock.lock();
 try {
 hashcig = true;
 log.debug("唤醒送烟的等待线程");
 cigCon.signal();
 }finally {
 lock.unlock();
 }
 },"t1").start();
 new Thread(() ->{
 lock.lock();
 try {
 hastakeout = true;
 log.debug("唤醒送饭的等待线程");
 takeCon.signal();
 }finally {
 lock.unlock();
 }
 },"t2").start();
 }

没有烟,歇一会
没有饭,歇一会
唤醒送烟的等待线程
唤醒送饭的等待线程
有烟了,干活
有饭了,干活

五.源码解析

首先会调用lock方法

public void lock() {
 sync.lock();
 }

lock会调用公平方法或者非公平的方法,默认是非公平锁方法,非公平锁则会cas尝试加锁,state是不是0,是0的话就把它改为1,并设置当前线程为独占线程,加锁成功,待下个线程进来时已经变成1,则失败阻塞。

加锁

final void lock() {
	// 看状态是不是0,如果是0 则改为1,加锁成功
 if (compareAndSetState(0, 1))
 // 并设置当前线程为独占线程
 setExclusiveOwnerThread(Thread.currentThread());
 else
 	//不是0则失败阻塞
 acquire(1);
 }
protected final void setExclusiveOwnerThread(Thread thread) {
 exclusiveOwnerThread = thread;
 }

加锁失败(入队 阻塞)

public final void acquire(int arg) {
 if (!tryAcquire(arg) &&
 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
 //恢复中断标识位
 selfInterrupt();
 }

首先tryAcquire 又进行了一次判断,看是否能获取锁,

final boolean nonfairTryAcquire(int acquires) {
 final Thread current = Thread.currentThread();
 int c = getState();
 //其他线程进来,状态值是1
 if (c == 0) {
 if (compareAndSetState(0, acquires)) {
 setExclusiveOwnerThread(current);
 return true;
 }
 }
 else if (current == getExclusiveOwnerThread()) {
 	// 重入,将状态值+1
 int nextc = c + acquires;
 if (nextc < 0) // overflow
 throw new Error("Maximum lock count exceeded");
 setState(nextc);
 return true;
 }
 return false;
 }

添加进队列

private Node addWaiter(Node mode) {
 Node node = new Node(Thread.currentThread(), mode);
 // Try the fast path of enq; backup to full enq on failure
 Node pred = tail;
 //第一次tail为空
 if (pred != null) {
 	//尾插法
 node.prev = pred;
 if (compareAndSetTail(pred, node)) {
 pred.next = node;
 return node;
 }
 }
 //tail为空则在这里创建队列
 enq(node);
 return node;
 }

创建队列并且入队

private Node enq(final Node node) {
 for (;;) {
 Node t = tail;
 if (t == null) { // Must initialize
 	//创建队列
 if (compareAndSetHead(new Node()))
 	// 将头节点指向前一节点的尾节点,这时候tail不为空了
 tail = head;
 } else {
 	//双向接口,前一节点的尾节点也指向当前节点的头节点
 node.prev = t;
 if (compareAndSetTail(t, node)) {
 t.next = node;
 return t;
 }
 }
 }
 }

阻塞

final boolean acquireQueued(final Node node, int arg) {
 boolean failed = true;
 try {
 boolean interrupted = false;
 for (;;) { //保证一定获取锁
 	//获取head节点
 final Node p = node.predecessor();
 //是头节点则尝试获取锁
 if (p == head && tryAcquire(arg)) {
 	//设置头节点
 setHead(node);
 p.next = null; // help GC
 failed = false;
 return interrupted;
 }
 //获取锁失败的情况,阻塞,在for循环里,第一次shouldParkAfterFailedAcquire为false,会将其设置为-1,第二次就可以阻塞
 if (shouldParkAfterFailedAcquire(p, node) &&
 parkAndCheckInterrupt())
 interrupted = true;
 }
 } finally {
 if (failed)
 cancelAcquire(node);
 }
 }

是否需要阻塞,把状态设置为SIGNAL,可以被唤醒了

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
 int ws = pred.waitStatus;
 //是-1了就可以去阻塞
 if (ws == Node.SIGNAL)
 /*
 * This node has already set status asking a release
 * to signal it, so it can safely park.
 */
 return true;
 if (ws > 0) {
 /*
 * Predecessor was cancelled. Skip over predecessors and
 * indicate retry.
 */
 do { //把节点去掉
 node.prev = pred = pred.prev;
 } while (pred.waitStatus > 0);
 pred.next = node;
 } else {
 /*
 * waitStatus must be 0 or PROPAGATE. Indicate that we
 * need a signal, but don't park yet. Caller will need to
 * retry to make sure it cannot acquire before parking.
 */
 //把状态设置为SIGNAL,可以被唤醒了
 compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
 }
 return false;
 }

真正的阻塞方法

private final boolean parkAndCheckInterrupt() {
 	//阻塞
 LockSupport.park(this);
 //清除中断标识位,在加锁失败方法的后面恢复中断标识位,可能其他地方还用到这个锁标识位
 return Thread.interrupted();
 }

唤醒 unlock()

public void unlock() {
 sync.release(1);
 }
 public final boolean release(int arg) {
 	// 尝试唤醒
 if (tryRelease(arg)) {
 Node h = head;
 if (h != null && h.waitStatus != 0)
 	//唤醒阻塞的线程
 unparkSuccessor(h);
 return true;
 }
 return false;
 }
protected final boolean tryRelease(int releases) {
	//当前状态-1
 int c = getState() - releases;
 if (Thread.currentThread() != getExclusiveOwnerThread())
 throw new IllegalMonitorStateException();
 boolean free = false;
 if (c == 0) {
 free = true;
 setExclusiveOwnerThread(null);
 }
 //设置状态
 setState(c);
 return free;
 }

在这里唤醒

private void unparkSuccessor(Node node) {
 /*
 * If status is negative (i.e., possibly needing signal) try
 * to clear in anticipation of signalling. It is OK if this
 * fails or if status is changed by waiting thread.
 */
 int ws = node.waitStatus;
 if (ws < 0)
 compareAndSetWaitStatus(node, ws, 0);
 /*
 * Thread to unpark is held in successor, which is normally
 * just the next node. But if cancelled or apparently null,
 * traverse backwards from tail to find the actual
 * non-cancelled successor.
 */
 Node s = node.next;
 if (s == null || s.waitStatus > 0) {
 s = null;
 for (Node t = tail; t != null && t != node; t = t.prev)
 if (t.waitStatus <= 0)
 s = t;
 }
 //后面一个节点不为空 则直接唤醒当前线程
 if (s != null)
 LockSupport.unpark(s.thread);
 }

线程取消获取锁

private void cancelAcquire(Node node) {
 // Ignore if node doesn't exist
 if (node == null)
 return;
 node.thread = null;
 // Skip cancelled predecessors
 Node pred = node.prev;
 while (pred.waitStatus > 0)
 	//将前一个节点干掉
 node.prev = pred = pred.prev;
 // predNext is the apparent node to unsplice. CASes below will
 // fail if not, in which case, we lost race vs another cancel
 // or signal, so no further action is necessary.
 Node predNext = pred.next;
 // Can use unconditional write instead of CAS here.
 // After this atomic step, other Nodes can skip past us.
 // Before, we are free of interference from other threads.
 node.waitStatus = Node.CANCELLED;
 // If we are the tail, remove ourselves.
 if (node == tail && compareAndSetTail(node, pred)) {
 compareAndSetNext(pred, predNext, null);
 } else {
 // If successor needs signal, try to set pred's next-link
 // so it will get one. Otherwise wake it up to propagate.
 int ws;
 if (pred != head &&
 ((ws = pred.waitStatus) == Node.SIGNAL ||
 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
 pred.thread != null) {
 Node next = node.next;
 if (next != null && next.waitStatus <= 0)
 compareAndSetNext(pred, predNext, next);
 } else {
 unparkSuccessor(node);
 }
 node.next = node; // help GC
 }
 }

至此加锁、解锁、阻塞、唤醒的底层源码都梳理完了。

作者:飞奔的小付原文地址:https://blog.csdn.net/feibendexiaoma/article/details/127662709

%s 个评论

要回复文章请先登录注册