AQS原理
Java中的大部分同步类(Lock、Semaphore、ReentrantLock等)都是基于AbstractQueuedSynchronizer(简称为AQS)实现的。
AQS是一种提供了原子式管理同步状态、阻塞和唤醒线程功能以及队列模型的简单框架。
实现一个不可重入的独占锁
关键
外部类实现Lock接口
组合一个实现了AQS的内部类
- 重写tryAcquire、tryRelease、isHeldExclusively方法
利用AQS的实现类实现Lock接口的所有方法
public class MyLock implements Lock {
// 独占锁
class MySync extends AbstractQueuedLongSynchronizer {
@Override
protected boolean tryAcquire(long arg) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override
protected boolean tryRelease(long arg) {
setExclusiveOwnerThread(null);
// volatile变量在后
setState(0);
return true;
}
@Override // 是否持有独占锁
protected boolean isHeldExclusively() {
return getState() == 1;
}
public Condition newCondition() {
return new ConditionObject();
}
}
private MySync sync = new MySync();
@Override
public void lock() { // 尝试加锁,若失败则进入队列等待
sync.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException { // 加锁 可打断
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() { // 尝试一次加锁 若失败则返回false
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1,unit.toNanos(time));
}
@Override
public void unlock() { // 解锁
sync.release(1);
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
}
ReentrantLock
状态-1 代表有责任唤醒后记
可重入原理
获取锁
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
c!=0 会判断当前线程是不是持有锁的线程,若是则将c+1
释放锁
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
每次state - 1,直到0才能成功释放
可打断原理
不可打断的代码分析
就算其他线程调用了打断也不能立刻响应打断
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
// 返回true
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private final boolean parkAndCheckInterrupt() {
// 如果打断标记已经是true 则park失效
LockSupport.park(this);
// interrupted();会清除打断标记
return Thread.interrupted();
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){
selfInterrupt();
}
}
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
可打断的
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 区别就是在这里!
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
非公平/公平原理
非公平的tryAcquire方法会直接去CAS设置State,而没有检查AQS的队列
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
公平的tryAcquire方法 !hasQueuedPredecessors()
会判断队列中是否还有节点
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}