前言:
上一篇我们了解了Lock接口与Condition接口。本篇来看看J.U.C中的ReadWriteLock,再次膜拜一下Doug Lea大神的杰作。
面试问题 Q :谈谈对ReadWriteLock的理解?
1.ReadWriteLock简介 ReadWriteLock接口是在JDK5提供的,具体的实现类为ReentrantReadWriteLock,还有一个实现类ReadWriteLockView,是StampedLock的内部类。后边会有讲到。
1 2 3 4 public interface ReadWriteLock { Lock readLock () ; Lock writeLock () ; }
ReadWriteLock直译为读写锁,从接口命名上就可以看出该工具类用于特定的场景下,前面讲过的ReentrantLock和synchronized基本可以用来解决一切并发问题,但在特定的场景下可能表现的效果不那么令人满意,如在读多写少的时,大部分线程都在进行读操作,很少有线程会修改共享数据。但由于加锁的特性,导致大量的读操作进行了不必要的锁竞争,如果能将读写的锁分离,有写操作的时候,进行读操作需要加锁;没有写操作的时候,可以多个线程同时进行读操作。这样势必会提升性能。
读写锁便是解决这种场景问题的。读写锁有三个基本原则:
允许多个线程同时读共享变量
只允许一个线程写共享变量
如果一个写线程正在执行,此时禁止读线程读共享变量,如果一个线程在读,同样也禁止写共享变量。
2.ReentrantReadWriteLock使用 2.1 锁降级 1 2 3 ReadWriteLock readWriteLock=new ReentrantReadWriteLock (); Lock readLock = readWriteLock.readLock()Lock writeLock = readWriteLock.writeLock()
可以看出无论是读锁还是写锁都是Lock接口的实现类,那么上一篇中提到Lock接口的三种加锁方式都可以使用。
1 2 3 4 5 6 void lockInterruptibly () throws InterruptedException;boolean tryLock (long time, TimeUnit unit) throws InterruptedException;boolean tryLock () ;
Reentrant代表可重入的,ReentrantReadWriteLock支持重入锁,而且也支持公平锁和非公平锁。
前面简单的介绍的读写锁的使用,这里有一个需要注意的点,就是读写锁的升级和降级。
ReentrantReadWriteLock不支持锁的升级 ,但是支持锁的降级 。锁降级就是持有写锁去申请读锁;锁升级是持有读锁去申请写锁,如果出现类似锁升级的代码,则会导致线程阻塞,且无法被唤醒。这点需要注意。
锁的升级:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock (); @Test public void test2 () throws InterruptedException { Thread thread = new Thread (() -> { readWriteLock.readLock().lock(); System.out.println("获取读锁" ); readWriteLock.writeLock().lock(); System.out.println("获取写锁" ); readWriteLock.writeLock().unlock(); System.out.println("释放写锁" ); readWriteLock.readLock().unlock(); System.out.println("释放读锁" ); }); thread.start(); thread.join(); }
锁的降级:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock (); @Test public void test () throws InterruptedException { Thread thread = new Thread (() -> { readWriteLock.writeLock().lock(); System.out.println("获取写锁" ); readWriteLock.readLock().lock(); System.out.println("获取读锁" ); readWriteLock.readLock().unlock(); System.out.println("释放写锁" ); readWriteLock.writeLock().unlock(); System.out.println("释放读锁" ); }); thread.start(); thread.join(); }
锁降级还是有很多应用场景的。比如有业务需要先查缓存,发现缓存失效需要重新去数据库查询数据并修改缓存,完成修改操作后应该尽快释放写锁,减小锁的粒度。这样能让更多的读线程尽快访问到修改后的数据。不然业务逻辑半天执行不完,这期间尽管缓存数据是最新的,但是由于写锁未释放,其他线程也无法进行读操作。极大的降低了并发性。
因此在完成修改缓存后去拿读锁,然后释放写锁,这样既能保证其他线程读取到最新的数据,又能保证当前线程的后续操作使用的数据是最新的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock (); final Lock r = readWriteLock.readLock(); final Lock w = readWriteLock.writeLock(); volatile boolean cacheValid; @Test public void processCachedData () { r.lock(); if (!cacheValid){ r.unlock(); w.lock(); try { if (!cacheValid){ cacheValid =true ; } r.lock(); }finally { w.unlock(); } } try { }finally { r.unlock(); } }
2.2 写锁支持条件变量 读锁不支持条件变量 ,如果读锁调用newCondition()会抛出UnsupportedOperationException异常;
写锁支持条件变量 ,锁与条件变量的搭配使用可以参考上一篇 Lock & Condition 。
3.ReentrantReadWriteLock原理 ReentrantReadWriteLock内部还是使用的AQS框架,通过前面的学习我们知道,在AQS中,通过volatile int state
来表示线程锁的状态,ReentrantReadWriteLock有两把锁:读锁和写锁,它们保护的都是同一个资源,如何用一个共享变量来区分写锁和读锁的状态呢?答案就是按位拆分。
由于state是int类型的变量,在内存中占用4个字节,也就是32位。将其拆分为两部分:高16位和低16位,其中高16位用来表示读锁状态,低16位用来表示写锁状态。这样就可以用一个int变量来表示两种锁的状态,低16位写锁的加锁和释放锁操作不会发生变化,仍是state+1/state-1;但高16位的加锁和释放锁就变成了state + (1<<16)/ state-(1<<16)。
同样获取读锁和写锁的状态也有所不同:
获取读锁:state >>> 16
无符号右移16位,空的地方补0。
获取写锁:state & [(1 << 16) - 1]
相当于state & 0x0000FFFF 相当于把高16位置空,只保留低16位。
由于读锁和写锁的状态值都只占用16位,所以读锁和写锁各自可重入锁的最大数量为2^16-1。
前面说了持有锁状态表示的问题,现在来看看其具体的实现。在此之前先了解一下ReentrantReadWriteLock的类结构。
ReentrantReadWriteLock中有两个内部类,ReadLock和WriteLock,这两个类在具体实现Lock接口时,分别调用ReentrantReadWriteLock中实现AQS类的同步组件Sync的共享和独占两种加锁释放锁方式来实现各自的功能。
Sync中实现AQS中独占锁加锁tryAcquire()和独占锁释放锁tryRelease(),以及共享锁的加锁tryAcquireShared()和共享锁的释放锁tryReleaseShared()。
下面的内容也是围绕这四个方法展开。
3.1 写锁加锁 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 protected final boolean tryAcquire (int acquires) { Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); if (c != 0 ) { if (w == 0 || current != getExclusiveOwnerThread()) return false ; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error ("Maximum lock count exceeded" ); setState(c + acquires); return true ; } if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false ; setExclusiveOwnerThread(current); return true ; }
下面附上tryAcquire()的流程图:
tryAcquire返回false的后续操作。
1 2 3 4 5 6 7 8 9 10 public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
3.2 写锁释放 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 protected final boolean tryRelease (int releases) { if (!isHeldExclusively()) throw new IllegalMonitorStateException (); int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0 ; if (free) setExclusiveOwnerThread(null ); setState(nextc); return free; }
3.3 读锁加锁 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 protected final int tryAcquireShared (int unused) { Thread current = Thread.currentThread(); int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1 ; int r = sharedCount(c); if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0 ) { firstReader = current; firstReaderHoldCount = 1 ; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0 ) readHolds.set(rh); rh.count++; } return 1 ; } return fullTryAcquireShared(current); }
下面附上tryAcquireShared()的流程图:
3.4 读锁释放 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 protected final boolean tryReleaseShared (int unused) { Thread current = Thread.currentThread(); if (firstReader == current) { if (firstReaderHoldCount == 1 ) firstReader = null ; else firstReaderHoldCount--; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1 ) { readHolds.remove(); if (count <= 0 ) throw unmatchedUnlockException(); } --rh.count; } for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) return nextc == 0 ; } }
前面提到的四个方法,是Doug Lea大神留给我们的主要的发挥空间,AQS中其他核心方法都无法重写。如果需要实现自定义的同步组件,那么对这四个方法必然要深入理解,然后根据组件特性来实现独占锁或共享锁。如ReentrantLock是独占锁,所以其内部只实现了tryAcquire()和tryRelease()。因此本篇着重介绍了ReentrantReadWriteLock实现AQS的具体细节,没有从AQS框架整体上展开,有些地方可能不太好理解,还希望大家多多谅解。
4.总结 在ReentrantReadWriteLock中实现了独占锁和共享锁两种方式,读锁是共享的,可以在没有写锁的时候被多个线程同时持有,并发地读数据;写锁是独占的,每次只能被一个线程持有,其他线程要想修改共享数据,则需要排队等待。获得了读锁的线程能够看到前一个释放的写锁所更新的内容。
理论上,读写锁比互斥锁允许对于共享数据更大程度的并发。与互斥锁相比,读写锁是否能够提高性能取决于读写数据的频率、读取和写入操作的持续时间以及读线程和写线程之间的竞争。
ReentrantReadWriteLock不支持锁的升级 ,但是支持锁的降级 。
ReentrantReadWriteLock的读锁不支持条件变量 ,但写锁支持条件变量。
Reference 《Java 并发编程实战》 《Java 编程思想(第4版)》 https://juejin.im/post/5dc22993f265da4cf77c8ded http://www.tianxiaobo.com
感谢阅读 !