AQS详解(一) 信号量Semaphore的使用

前言

上个算法题说到了信号量,那么就通过信号量来详细的解读下AQS。
首先理解一下信号量的概念,是在多线程环境下使用的一种设施,是可以用来保证两个或多个关键代码段不被并发调用,在进入一个关键代码段之前,线程必须获取一个信号量,一旦该代码段完成了,那么该线程必须释放信号量,其他想进入该关键代码段的线程必须等待直到第一个线程释放信号量。
所以我们可以了解到,信号量多是用来解决资源有限的问题。那我们就通过java中的Semaphore来看一下。
信号量只能保证资源的唯一性,而不能保证线程的安全,所以和synchonized等多线程安全还是有区别的。

类图

semaphare 上图中,我们日常使用的是semaphore,其中封装了FairSync和NonfairSync,而这两个子类也是继承自sync,sync又继承自AQS,所以从上到下的逻辑。

日常使用

// 创建
Semaphore sm = new Semaphore(1);
sm.acquire();
sm.release();

构造函数

    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

根据传入参数的不同,会创建不同的sync,公平锁和非公平锁,公平锁和非公平锁的区别是什么呢?我们先抛出结论,这样在后面看到的时候就能加深代码理解:公平锁就是在申请的时候放在队列的末尾,这样所有申请者都是按照顺序来的,所以是公平的。而非公平锁则是在申请的时候,会立即查看当前是否可以申请到,如果可以,那么就进行执行。可以理解为,每次新任务都会进行一个插队操作,成功了就可以执行。因为这个插队操作,所以才是非公平的。 由类图可以知道,FairSync和NonfairSync都是集成自Sync,而Sync又是继承自AQS。

    /**
     * NonFair version
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;

        NonfairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires); // 实现在Sync中
        }
    }

    /**
     * Fair version
     */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = 2014338818796000944L;

        FairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) { // 自己对方法进行了实现
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }

上面可以看到,虽然两者都从Sync继承过来,但是fair中对tryacquireshared进行了实现,而NonFair并未对此进行实现,而是父类Sync中对这个进行了实现。

    abstract static class Sync extends AbstractQueuedSynchronizer {
        final int nonfairTryAcquireShared(int acquires) { // NonFair的实现。
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
}

从nonFair和Fair的实现,可以看到,都是不停的循环,同时尝试CAS操作,如果操作成功,则返回,否则会一直尝试。而Fair中的tryAcquireShared则是多了一个hasQueuePredecessors,此处也就是公平锁的由来,如果当前队列中有线程比当前线程等待的更久,那么就不再进行尝试。

acquire

好的,让我们来看到acquire方法。

    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

尝试获取资源,而acquiresharedinterruptibly是在父类AQS中的,如果无法获取到资源,则进行doacquiresharedinterruptibly.

    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED); // 添加进队列
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

好的,到了这里,我们暂时先不往下看,我们回头来看release的源码。

release

    public void release() {
        sync.releaseShared(1);
    }

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }

上方代码,可以看到,同样是采用了CAS自旋锁的方式来进行操作,至于这个compareAndSetState方法,是一个native的方法,基于底层的原子操作,这个我们后面的文章再谈。
现在我们回顾一下:我们通过了信号量Semaphore的使用方式,看到了cas上层的一个初始模型,简单理解就是:通过CAS的方式,来对资源的限制进行加减操作,从而来控制资源的使用量。那么下一个问题,就又出来了,如果获取不到,或者竞争状态下的情况是什么样的呢?我们继续看一下 doAcquireSharedInterruptibly的源码。

doAcquireSharedInterruptibly

上面在申请的时候已经说到了会添加进队列,这个后面再提。然后创建一个标志位,用来标识是否获取成功。我们重点关注shouldParkAfterFailedAcquire和parkAndCheckInterrupt方法

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
    public static void park(Object blocker) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        UNSAFE.park(false, 0L);
        setBlocker(t, null);
    }

可以看到,也是通过unsafe这个类来进行的底层操作,这块我们后面文章会具体讲解底层的原理。

总结一下

信号量Semaphore的原理,其实就是利用底层的cas自旋锁来进行state的设置,如果获取不到state,则会将当前进程放入等待队列中。而具体的底层如何等待,又是当有了资源之后如何唤醒,我们下个文章分析。