Lock接口
jdk1.5之后出现的Lock接口提供synchronized所不具备的三个特性: 他可以尝试非阻塞的获取锁; 能够被中断的获取锁; 超时获取锁. 它可以显示的获取和释放锁
Lock的实现依赖AQS,AQS通过一个volatile整形变量来维护同步状态, 基于这个volatile变量的内存语义, Lock接口实现了内存可见性.
# 重入锁
重入锁即支持重进入的锁, 表示该锁能够支持对一个资源的重复加锁.
先实现一个不支持重入的独占锁, 而后在其基础上改进为可重入锁.
public class Mutex implements Lock {
private final Sync sync = new Sync();
private static class Sync extends AbstractQueuedSynchronizer{
@Override
protected boolean tryAcquire(int arg) {
if (compareAndSetState(0,1)){
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override
protected boolean tryRelease(int arg) {
if (getState() == 0){
throw new IllegalMonitorStateException();
}
setExclusiveOwnerThread(null);
setState(0);
return true;
}
/**
* 当前是否处于独占状态
* @return
*/
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
ConditionObject newCondition(){
return new ConditionObject();
}
}
@Override
public void lock() {
sync.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1,unit.toNanos(time));
}
@Override
public void unlock() {
sync.release(1);
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
这把不支持重入的互斥锁中的AQS共有两个状态0, 1 . 0表示锁未被占用, 1表示锁已被某个线程占用, 后续线程尝试加锁时将进入同步队列阻塞. 这把锁的AQS的tryAcquire()方法的实现没有考虑已经持有锁的线程尝试再次调用lock()方法时的情况, 当再次调用时已持有锁的线程也将会被阻塞.
synchronized隐式的支持重入.
Lock接口的实现类ReentrantLock实现了可重入. 同时ReentrantLock的构造函数还可以指定锁是否是公平锁. 公平的获取锁即等待时间最长的线程 优先获取锁. 反之是不公平的.
# 可重入的实现
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 当AQS的同步状态已被持有时, ReentrantLock加入了同步状态的持有者判断
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 若是已持有锁的线程尝试重复加锁会将AQS的state增加
setState(nextc);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// ReentrantLock的最终释放锁的条件是state == 0
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# 公平与非公平的实现
# 非公平锁的实现
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 在非公平锁的实现中, 只要能够CAS修改AQS的state成功即可获取到锁
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
...
}
2
3
4
5
6
7
8
9
10
11
12
13
# 公平锁的实现
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 多出了一步hasQueuedPredecessors()
if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
...
}
2
3
4
5
6
7
8
9
10
11
12
13
在公平锁的实现中多出了一步判断当前节点是否有前驱节点, 如果有表示已经有线程在更早之前就已经请求锁了.
ReentrantLock默认实现是非公平锁, 原因是非公平锁不会引起大量的线程切换. 但需要注意的是非公平锁可能会造成线程饥饿.
# 读写锁
读写锁在同一时刻允许多个读线程同时访问, 但是在写线程访问时, 所有的读线程和其他写线程将被阻塞.
java并发包中提供读写锁的实现是ReentrantReadWriteLock. 它提供公平性选择, 支持重入, 支持锁降级. 遵循获取写锁, 获取读锁再释放写锁的次序, 写锁能够降级为读锁.
ReentrantReadWriteLock实现了ReadWriteLock接口, 同时还对外提供了一些便于检测内部状态的方法.
方法 | 描述 |
---|---|
int getReadLockCount() | 返回当前读锁被获取的次数. 这个次数不等于获取读锁的线程数. 例如一个线程获取了读锁多次(重进入), 这个方法将返回n而不是1 |
int getReadHoldCount() | 返回当前线程获取读锁的次数. jdk1.6之后使用ThreadLocal保存当前线程获取的次数 |
boolean isWriteLocked() | 判断写锁是否正在被占用 |
int getWriteHoldCount() | 返回当前写锁被获取次数 |
# 设计
ReentrantReadWriteLock内部的AQS采用按位切割使用的方式在一个整形变量上维护了多种状态. 读写锁将这个变量切分成两部分, 高16位表示读, 低16位表示写.
读写锁通过对这个变量做位运算确定读写状态. 写状态 = state & 0x0000FFFF. 读状态 = state >>> 16(无符号补0右移16位). 当写状态增加1时, 等于state + 1. 当读状态增加1时, 等于 state + (1 << 16), 也就是state + 0x00010000.
public class ReentrantReadWriteLock
implements ReadWriteLock, java.io.Serializable {
...
private final ReadLock readLock;
private final WriteLock writeLock;
// readLock和writeLock公布想Sync
final Sync sync
public ReentrantReadWriteLock() {
this(false);
}
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
};
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 写锁的获取与释放
读写锁的写锁是一个支持重入的排他锁. 若当前线程已经获取了写锁, 则增加写状态. 若获取写锁时, 读锁已被获取, 或写锁已被其他线程获取, 则线程进入等待.
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
// 对应第一种情况
if (w == 0 || current != getExclusiveOwnerThread())
return false;
// 对应第二种情况
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
// 对应第三种情况
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
写锁的释放与ReentrantLock类似, 每次释放减少写状态.
# 读锁的获取与释放
读锁是一个支持重进入的共享锁.
// 这部分代码参考Java并发编程的艺术, 忽略了与getReadHoldCount()相关的代码
protected final int tryAcquireShared(int unused){
for(;;){
int c = getState();
int nextc = c = (1 << 16);
if (nextc < c){
throw new Error("Maximum lock count exceeded");
}
if(exclusiveCount(c) != 0 && owner != Thread.currentThread()){
return -1;
}
if(compareAndSetState(c, nextc)){
return 1;
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 锁降级
锁降级是指当前线程持有写锁, 又获取读锁, 再释放写锁的过程.
示例
private volatile boolean update;
public void processData(){
readLock.lock();
if(!update){
readLock.unLock();
// 开始锁降级
writeLock.lock();
try{
if(!update){
update = true;
}
readLock.lock();
}finally{
writeLock.unLock();
}
// 锁降级完成.
}
try{
// ...使用数据
}finally{
readLock.unLock();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 为何必须要获取到读锁
只有在释放写锁之前同时拿到读锁, 才可以起到阻塞其他见缝插针的写线程拿到写锁并写入数据, 导致当前线程无法感知其他线程的数据更新问题
# 为何不支持锁升级
写锁只有在读锁未被持有的情况下才能获取
# LockSupport
它提供了线程的阻塞与唤醒功能, 其中park方法的blocker对象可以为开发人员提供详细的堆栈信息.
# Condition
Condition提供类似Object的监视方法
Condition是AQS的内部类, 因此每个Condition都拥有AQS的引用
ConditionObject内部维护了一个AQS的同步队列, 当调用了awit()时, 相当于同步队列的首节点移动到了等待队列中.
当调用signal()时会将等待队列中等待时间最长的节点移动到同步队列中. 当这个节点从同步队列中获取到锁时, 从awit()返回.