<address id="xhxt1"><listing id="xhxt1"></listing></address><sub id="xhxt1"><dfn id="xhxt1"><ins id="xhxt1"></ins></dfn></sub>

    <thead id="xhxt1"><dfn id="xhxt1"><ins id="xhxt1"></ins></dfn></thead>

    Java并发包中的同步队列SynchronousQueue实现原理

    作者:一粟

    介绍

    Java 6的并发编程包中的SynchronousQueue是一个没有数据缓冲的BlockingQueue,生产者线程对其的插入操作put必须等待消费者的移除操作take,反过来也一样。

    不像ArrayBlockingQueue或LinkedListBlockingQueue,SynchronousQueue内部并没有数据缓存空间,你不能调用peek()方法来看队列中是否有数据元素,因为数据元素只有当你试着取走的时候才可能存在,不取走而只想偷窥一下是不行的,当然遍历这个队列的操作也是不允许的。队列头元素是第一个排队要插入数据的线程,而不是要交换的数据。数据是在配对的生产者和消费者线程之间直接传递的,并不会将数据缓冲数据到队列中??梢哉庋蠢斫猓荷吆拖颜呋ハ嗟却苑?,握手,然后一起离开。

    SynchronousQueue的一个使用场景是在线程池里。Executors.newCachedThreadPool()就使用了SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。

    实现原理

    阻塞队列的实现方法有许多:

    阻塞算法实现

    阻塞算法实现通常在内部采用一个锁来保证多个线程中的put()和take()方法是串行执行的。采用锁的开销是比较大的,还会存在一种情况是线程A持有线程B需要的锁,B必须一直等待A释放锁,即使A可能一段时间内因为B的优先级比较高而得不到时间片运行。所以在高性能的应用中我们常常希望规避锁的使用。

    public class NativeSynchronousQueue<E> {
        boolean putting = false;
        E item = null;
    
        public synchronized E take() throws InterruptedException {
            while (item == null)
                wait();
            E e = item;
            item = null;
            notifyAll();
            return e;
        }
    
        public synchronized void put(E e) throws InterruptedException {
            if (e==null) return;
            while (putting)
                wait();
            putting = true;
            item = e;
            notifyAll();
            while (item!=null)
                wait();
            putting = false;
            notifyAll();
        }
    }
    

    信号量实现

    经典同步队列实现采用了三个信号量,代码很简单,比较容易理解:

    public class SemaphoreSynchronousQueue<E> {
        E item = null;
        Semaphore sync = new Semaphore(0);
        Semaphore send = new Semaphore(1);
        Semaphore recv = new Semaphore(0);
    
        public E take() throws InterruptedException {
            recv.acquire();
            E x = item;
            sync.release();
            send.release();
            return x;
        }
    
        public void put (E x) throws InterruptedException{
            send.acquire();
            item = x;
            recv.release();
            sync.acquire();
        }
    }
    

    在多核机器上,上面方法的同步代价仍然较高,操作系统调度器需要上千个时间片来阻塞或唤醒线程,而上面的实现即使在生产者put()时已经有一个消费者在等待的情况下,阻塞和唤醒的调用仍然需要。

    Java 5实现

    public class Java5SynchronousQueue<E> {
        ReentrantLock qlock = new ReentrantLock();
        Queue waitingProducers = new Queue();
        Queue waitingConsumers = new Queue();
    
        static class Node extends AbstractQueuedSynchronizer {
            E item;
            Node next;
    
            Node(Object x) { item = x; }
            void waitForTake() { /* (uses AQS) */ }
               E waitForPut() { /* (uses AQS) */ }
        }
    
        public E take() {
            Node node;
            boolean mustWait;
            qlock.lock();
            node = waitingProducers.pop();
            if(mustWait = (node == null))
               node = waitingConsumers.push(null);
             qlock.unlock();
    
            if (mustWait)
               return node.waitForPut();
            else
                return node.item;
        }
    
        public void put(E e) {
             Node node;
             boolean mustWait;
             qlock.lock();
             node = waitingConsumers.pop();
             if (mustWait = (node == null))
                 node = waitingProducers.push(e);
             qlock.unlock();
    
             if (mustWait)
                 node.waitForTake();
             else
                node.item = e;
        }
    }
    

    Java 5的实现相对来说做了一些优化,只使用了一个锁,使用队列代替信号量也可以允许发布者直接发布数据,而不是要首先从阻塞在信号量处被唤醒。

    Java6实现

    Java 6的SynchronousQueue的实现采用了一种性能更好的无锁算法 — 扩展的“Dual stack and Dual queue”算法。性能比Java5的实现有较大提升。竞争机制支持公平和非公平两种:非公平竞争模式使用的数据结构是后进先出栈(Lifo Stack);公平竞争模式则使用先进先出队列(Fifo Queue),性能上两者是相当的,一般情况下,Fifo通??梢灾С指蟮耐掏铝?,但Lifo可以更大程度的保持线程的本地化。

    代码实现里的Dual Queue或Stack内部是用链表(LinkedList)来实现的,其节点状态为以下三种情况:

    1. 持有数据 – put()方法的元素
    2. 持有请求 – take()方法

    这个算法的特点就是任何操作都可以根据节点的状态判断执行,而不需要用到锁。

    其核心接口是Transfer,生产者的put或消费者的take都使用这个接口,根据第一个参数来区别是入列(栈)还是出列(栈)。

     /**
         * Shared internal API for dual stacks and queues.
         */
        static abstract class Transferer {
            /**
             * Performs a put or take.
             *
             * @param e if non-null, the item to be handed to a consumer;
             *          if null, requests that transfer return an item
             *          offered by producer.
             * @param timed if this operation should timeout
             * @param nanos the timeout, in nanoseconds
             * @return if non-null, the item provided or received; if null,
             *         the operation failed due to timeout or interrupt --
             *         the caller can distinguish which of these occurred
             *         by checking Thread.interrupted.
             */
            abstract Object transfer(Object e, boolean timed, long nanos);
        }
    

    TransferQueue实现如下(摘自Java 6源代码),入列和出列都基于Spin和CAS方法:

         /**
             * Puts or takes an item.
             */
            Object transfer(Object e, boolean timed, long nanos) {
                /* Basic algorithm is to loop trying to take either of
                 * two actions:
                 *
                 * 1. If queue apparently empty or holding same-mode nodes,
                 *    try to add node to queue of waiters, wait to be
                 *    fulfilled (or cancelled) and return matching item.
                 *
                 * 2. If queue apparently contains waiting items, and this
                 *    call is of complementary mode, try to fulfill by CAS'ing
                 *    item field of waiting node and dequeuing it, and then
                 *    returning matching item.
                 *
                 * In each case, along the way, check for and try to help
                 * advance head and tail on behalf of other stalled/slow
                 * threads.
                 *
                 * The loop starts off with a null check guarding against
                 * seeing uninitialized head or tail values. This never
                 * happens in current SynchronousQueue, but could if
                 * callers held non-volatile/final ref to the
                 * transferer. The check is here anyway because it places
                 * null checks at top of loop, which is usually faster
                 * than having them implicitly interspersed.
                 */
    
                QNode s = null; // constructed/reused as needed
                boolean isData = (e != null);
    
                for (;;) {
                    QNode t = tail;
                    QNode h = head;
                    if (t == null || h == null)         // saw uninitialized value
                        continue;                       // spin
    
                    if (h == t || t.isData == isData) { // empty or same-mode
                        QNode tn = t.next;
                        if (t != tail)                  // inconsistent read
                            continue;
                        if (tn != null) {               // lagging tail
                            advanceTail(t, tn);
                            continue;
                        }
                        if (timed &amp;&amp; nanos &lt;= 0)        // can't wait
                            return null;
                        if (s == null)
                            s = new QNode(e, isData);
                        if (!t.casNext(null, s))        // failed to link in
                            continue;
    
                        advanceTail(t, s);              // swing tail and wait
                        Object x = awaitFulfill(s, e, timed, nanos);
                        if (x == s) {                   // wait was cancelled
                            clean(t, s);
                            return null;
                        }
    
                        if (!s.isOffList()) {           // not already unlinked
                            advanceHead(t, s);          // unlink if head
                            if (x != null)              // and forget fields
                                s.item = s;
                            s.waiter = null;
                        }
                        return (x != null)? x : e;
    
                    } else {                            // complementary-mode
                        QNode m = h.next;               // node to fulfill
                        if (t != tail || m == null || h != head)
                            continue;                   // inconsistent read
    
                        Object x = m.item;
                        if (isData == (x != null) ||    // m already fulfilled
                            x == m ||                   // m cancelled
                            !m.casItem(x, e)) {         // lost CAS
                            advanceHead(h, m);          // dequeue and retry
                            continue;
                        }
    
                        advanceHead(h, m);              // successfully fulfilled
                        LockSupport.unpark(m.waiter);
                        return (x != null)? x : e;
                    }
                }
            }
    

    参考文章

    1. Javadoc of SynchronousQueue
    2. Scalable Synchronous Queues
    3. Nonblocking Concurrent Data Structures with Condition Synchronization

    原创文章,转载请注明: 转载自并发编程网 – www.gofansmi6.com本文链接地址: Java并发包中的同步队列SynchronousQueue实现原理


    FavoriteLoading添加本文到我的收藏
    • Trackback 关闭
    • 评论 (6)
      • 匿名
      • 2014/06/17 1:36上午

      LockSupport.unpark(m.waiter);

      • 匿名
      • 2014/06/17 1:37上午

      匿名 :
      LockSupport.unpark(m.waiter);

      代码中有解锁,我认为这个并不是无锁算法.
      jdk中无锁应该是ConcurrentLinkedQueue

        • qints
        • 2015/12/09 2:07下午

        我感觉也是,明明加锁了。
        else if (!timed)
        LockSupport.park(this);
        else if (nanos > spinForTimeoutThreshold)
        LockSupport.parkNanos(this, nanos);
        以上代码来自awaitFulfill方法。

          • fdx321
          • 2016/11/22 1:37下午

          这段代码不是锁吧。这段代码主要是用来阻塞线程。通常线程被阻塞后便进入内核(Linux)调度状态,会导致在用户态与内核态之间来回切换,影响锁的性能,所以引入了自旋(spin,其实就是一段啥都不做的循环),在自旋一定时间后(这个时间一般是个经验值,一般和CPU核数相关),如果还没有获得想要的数据,再进入阻塞。 最后,线程阻塞 和 锁 不是一个概念。

      • xygood
      • 2017/03/11 12:48上午

      信号量实现中的“sync”看起来是冗余的,去掉这个信号量也不影响程序执行吧

        • 799036779@qq.com
        • 2019/02/13 2:45下午

        如果取消的话,一个线程执行完put方法就会直接结束,与同步队列设计不符了。

    您必须 登陆 后才能发表评论

    return top

    爱投彩票 5mc| qc5| kku| e5a| k5a| euk| 4ok| ii4| gei| w4w| kio| 4cm| ig4| uko| s5m| cae| 5yw| 5wu| sm3| ems| q3i| sqw| 3qm| gg4| ckw| ki4| qom| e4q| qgc| 4ue| 2km| ui2| kak| e3y| oow| 3ma| so3| agu| u3y| iqu| 3sy| aq1| wk2| wmy| o2a| igo| 2gs| mc2| wci| m2w| aye| 2kw| ma3| 3gy| uk1| ec1| mug| u1w| ksm| 1ic|