有比 ReadWriteLock更快的锁?( 四 )


/** * 获取读锁,如果写锁被占用,则阻塞 * 注意该方法不响应中断 * @return 返回非0表示成功 */public long readLock() {    long s = state, next;      // 队列为空且读锁未超限    return ((whead == wtail && (s & ABITS) < RFULL && // (s & ABITS) < RFULL表示写锁未占用且读锁数量未超限           U.compareAndSwapLong(this, STATE, s, next = s + RUNIT))?           next : acquireRead(false, 0L));}acquireRead方法非常复杂,用到了大量自旋操作:
/** * 尝试自旋的获取读锁, 获取不到则加入等待队列, 并阻塞线程 * * @param interruptible true 表示检测中断, 如果线程被中断过, 则最终返回INTERRUPTED * @param deadline      如果非0, 则表示限时获取 * @return 非0表示获取成功, INTERRUPTED表示中途被中断过 */private long acquireRead(boolean interruptible, long deadline) {    WNode node = null, p;   // node指向入队结点, p指向入队前的队尾结点    /**     * 自旋入队操作     * 如果写锁未被占用, 则立即尝试获取读锁, 获取成功则返回.     * 如果写锁被占用, 则将当前读线程包装成结点, 并插入等待队列(如果队尾是写结点,直接链接到队尾;否则,链接到队尾读结点的栈中)     */    for (int spins = -1; ; ) {        WNode h;        if ((h = whead) == (p = wtail)) {   // 如果队列为空或只有头结点, 则会立即尝试获取读锁            for (long m, s, ns; ; ) {                if ((m = (s = state) & ABITS) < RFULL ?     // 判断写锁是否被占用                    U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :  //写锁未占用,且读锁数量未超限, 则更新同步状态                    (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L))        //写锁未占用,但读锁数量超限, 超出部分放到readerOverflow字段中                    return ns;          // 获取成功后, 直接返回                else if (m >= WBIT) {   // 写锁被占用,以随机方式探测是否要退出自旋                    if (spins > 0) {                        if (LockSupport.nextSecondarySeed() >= 0)                            --spins;                    } else {                        if (spins == 0) {                            WNode nh = whead, np = wtail;                            if ((nh == h && np == p) || (h = nh) != (p = np))                                break;                        }                        spins = SPINS;                    }                }            }        }        if (p == null) {                            // p == null表示队列为空, 则初始化队列(构造头结点)            WNode hd = new WNode(WMODE, null);            if (U.compareAndSwapObject(this, WHEAD, null, hd))                wtail = hd;        } else if (node == null) {                  // 将当前线程包装成读结点            node = new WNode(RMODE, p);        } else if (h == p || p.mode != RMODE) {     // 如果队列只有一个头结点, 或队尾结点不是读结点, 则直接将结点链接到队尾, 链接完成后退出自旋            if (node.prev != p)                node.prev = p;            else if (U.compareAndSwapObject(this, WTAIL, p, node)) {                p.next = node;                break;            }        }        // 队列不为空, 且队尾是读结点, 则将添加当前结点链接到队尾结点的cowait链中(实际上构成一个栈, p是栈顶指针 )        else if (!U.compareAndSwapObject(p, WCOWAIT, node.cowait = p.cowait, node)) {    // CAS操作队尾结点p的cowait字段,实际上就是头插法插入结点            node.cowait = null;        } else {            for (; ; ) {                WNode pp, c;                Thread w;                // 尝试唤醒头结点的cowait中的第一个元素, 假如是读锁会通过循环释放cowait链                if ((h = whead) != null && (c = h.cowait) != null &&                    U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&                    (w = c.thread) != null) // help release                    U.unpark(w);                if (h == (pp = p.prev) || h == p || pp == null) {                    long m, s, ns;                    do {                        if ((m = (s = state) & ABITS) < RFULL ?                            U.compareAndSwapLong(this, STATE, s,                                ns = s + RUNIT) :                            (m < WBIT &&                                (ns = tryIncReaderOverflow(s)) != 0L))                            return ns;                    } while (m < WBIT);                }                if (whead == h && p.prev == pp) {                    long time;                    if (pp == null || h == p || p.status > 0) {                        node = null; // throw away                        break;                    }                    if (deadline == 0L)                        time = 0L;                    else if ((time = deadline - System.nanoTime()) <= 0L)                        return cancelWaiter(node, p, false);                    Thread wt = Thread.currentThread();                    U.putObject(wt, PARKBLOCKER, this);                    node.thread = wt;                    if ((h != pp || (state & ABITS) == WBIT) && whead == h && p.prev == pp) {                        // 写锁被占用, 且当前结点不是队首结点, 则阻塞当前线程                        U.park(false, time);                    }                    node.thread = null;                    U.putObject(wt, PARKBLOCKER, null);                    if (interruptible && Thread.interrupted())                        return cancelWaiter(node, p, true);                }            }        }    }    for (int spins = -1; ; ) {        WNode h, np, pp;        int ps;        if ((h = whead) == p) {     // 如果当前线程是队首结点, 则尝试获取读锁            if (spins < 0)                spins = HEAD_SPINS;            else if (spins < MAX_HEAD_SPINS)                spins <<= 1;            for (int k = spins; ; ) { // spin at head                long m, s, ns;                if ((m = (s = state) & ABITS) < RFULL ?     // 判断写锁是否被占用                    U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :  //写锁未占用,且读锁数量未超限, 则更新同步状态                    (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) {      //写锁未占用,但读锁数量超限, 超出部分放到readerOverflow字段中                    // 获取读锁成功, 释放cowait链中的所有读结点                    WNode c;                    Thread w;                    // 释放头结点, 当前队首结点成为新的头结点                    whead = node;                    node.prev = null;                    // 从栈顶开始(node.cowait指向的结点), 依次唤醒所有读结点, 最终node.cowait==null, node成为新的头结点                    while ((c = node.cowait) != null) {                        if (U.compareAndSwapObject(node, WCOWAIT, c, c.cowait) && (w = c.thread) != null)                            U.unpark(w);                    }                    return ns;                } else if (m >= WBIT &&                    LockSupport.nextSecondarySeed() >= 0 && --k <= 0)                    break;            }        } else if (h != null) {     // 如果头结点存在cowait链, 则唤醒链中所有读线程            WNode c;            Thread w;            while ((c = h.cowait) != null) {                if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&                    (w = c.thread) != null)                    U.unpark(w);            }        }        if (whead == h) {            if ((np = node.prev) != p) {                if (np != null)                    (p = np).next = node;   // stale            } else if ((ps = p.status) == 0)        // 将前驱结点的等待状态置为WAITING, 表示之后将唤醒当前结点                U.compareAndSwapInt(p, WSTATUS, 0, WAITING);            else if (ps == CANCELLED) {                if ((pp = p.prev) != null) {                    node.prev = pp;                    pp.next = node;                }            } else {        // 阻塞当前读线程                long time;                if (deadline == 0L)                    time = 0L;                else if ((time = deadline - System.nanoTime()) <= 0L)   //限时等待超时, 取消等待                    return cancelWaiter(node, node, false);                Thread wt = Thread.currentThread();                U.putObject(wt, PARKBLOCKER, this);                node.thread = wt;                if (p.status < 0 && (p != h || (state & ABITS) == WBIT) && whead == h && node.prev == p) {                    // 如果前驱的等待状态为WAITING, 且写锁被占用, 则阻塞当前调用线程                    U.park(false, time);                }                node.thread = null;                U.putObject(wt, PARKBLOCKER, null);                if (interruptible && Thread.interrupted())                    return cancelWaiter(node, node, true);            }        }    }}


推荐阅读