<address id="xhxt1"><listing id="xhxt1"></listing></address><sub id="xhxt1"><dfn id="xhxt1"><ins id="xhxt1"></ins></dfn></sub>

    <thead id="xhxt1"><dfn id="xhxt1"><ins id="xhxt1"></ins></dfn></thead>

    JUC LinkedBlockingQueue

    本文首先发表在 码蜂笔记

    java.util.concurrent.LinkedBlockingQueue 是一个基于单向链表的、范围任意的(其实是有界的)、FIFO 阻塞队列。访问与移除操作是在队头进行,添加操作是在队尾进行,并分别使用不同的锁进行?;?,只有在可能涉及多个节点的操作才同时对两个锁进行加锁。

    队列是否为空、是否已满仍然是通过元素数量的计数器(count)进行判断的,由于可以同时在队头、队尾并发地进行访问、添加操作,所以这个计数器必须是线程安全的,这里使用了一个原子类 AtomicInteger,这就决定了它的容量范围是: 1 – Integer.MAX_VALUE。

    由于同时使用了两把锁,在需要同时使用两把锁时,加锁顺序与释放顺序是非常重要的:必须以固定的顺序进行加锁,再以与加锁顺序的相反的顺序释放锁。

    头结点和尾结点一开始总是指向一个哨兵的结点,它不持有实际数据,当队列中有数据时,头结点仍然指向这个哨兵,尾结点指向有效数据的最后一个结点。这样做的好处在于,与计数器 count 结合后,对队头、队尾的访问可以独立进行,而不需要判断头结点与尾结点的关系。

    属性与链表节点类

    // 链表的结点类,单向链表,只有一个后继指针
    static class Node<E> {
        E item;
    
         /*
         * 后继指针。值为下列之一:
         * 实际的后继结点。
         * 自身,表示后继是 head.next (用于在遍历处理时判断)
         * null,表示没有后继(这是尾结点)
         */
        Node<E> next;
    
        Node(E x) { item = x; }
    }
    
    // 最大容量上限,默认是 Integer.MAX_VALUE
    private final int capacity;
    
    // 当前元素数量,这是个原子类。因为读写分别使用不同的锁,但都会访问这个属性,所以它需要是线程安全的。
    private final AtomicInteger count = new AtomicInteger(0);
    
    // 头结点
    private transient Node<E> head;
    
    // 尾结点
    private transient Node<E> last;
    
    // 队头访问锁
    private final ReentrantLock takeLock = new ReentrantLock();
    
    // 队头访问等待条件、队列
    private final Condition notEmpty = takeLock.newCondition();
    
    // 队尾访问锁
    private final ReentrantLock putLock = new ReentrantLock();
    
    // 队尾访问等待条件、队列
    private final Condition notFull = putLock.newCondition();
    

    enqueue 操作

    // 在持有 putLock 锁下执行
    private void enqueue(Node<E> node) {
        // assert putLock.isHeldByCurrentThread();
        // assert last.next == null;
        last = last.next = node;
    }
    

    dequeue 操作

    返回队列里第一个有效元素。

    // 在持有 takeLock 锁下执行
    private E dequeue() {
        // assert takeLock.isHeldByCurrentThread();
        // assert head.item == null;
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
    
        head = first;
        E x = first.item;
        first.item = null; // 出队列后的结点作为新的哨兵结点
        return x;
    }
    

    对两把锁的加锁与释放

    在需要对两把锁同时加锁时,把加锁的顺序与释放的顺序封装成方法,确保所有地方都是一致的。而且获取锁时都是不响应中断的,一直获取直到加锁成功,这就避免了第一把锁加锁成功,而第二把锁加锁失败导致锁不释放的风险。

    注意,锁的释放顺序与加锁顺序是相反的。

    // 把固定的加锁顺序封装在方法内,确保所有的对两把锁加锁的顺序都是一致的。
    void fullyLock() {
        putLock.lock();
        takeLock.lock();
    }
    
    // 把固定的释放锁顺序封装在方法内,确保所有的对两把锁的释放顺序都是一致的。
    void fullyUnlock() {
        takeLock.unlock();
        putLock.unlock();
    }
    

    put 操作

    put 操作把指定元素添加到队尾,如果没有空间则一直等待。

    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
    
        // 在所有的 put/take/etc 等操作中预设值本地变量 c 为负数表示失败。成功会设置为 >= 0 的值。
        int c = -1;
        Node<E> node = new Node(e);
    
        // 下面两行是访问优化
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
    
        putLock.lockInterruptibly();
        try {
            /*
             * 注意,count用于等待监视,即使它没有用锁?;?。这个可行是因为
             * count 只能在此刻(持有putLock)减?。ㄆ渌鹥ut线程都被锁拒之门外),
             * 当count对capacity发生变化时,当前线程(或其他put等待线程)将被通知。
             * 在其他等待监视的使用中也类似。
             */
            while (count.get() == capacity) {
                notFull.await();
            }
    
            enqueue(node);
            c = count.getAndIncrement();
    
            // 还有可添加空间则唤醒put等待线程。
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
    
        // c 由 count.getAndIncrement()返回,如果等于0,
        // 则 count 应该是大于等于 1 了,唤醒take线程。
        if (c == 0)
            signalNotEmpty();
    }
    

    take 操作

    take 操作会一直阻塞直到有元素可返回。

    public E take() throws InterruptedException {
        E x;
        int c = -1;
        // 下面两行是访问优化
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
    
        takeLock.lockInterruptibly();
        try {
          // 循环里等待直到有数据可获取
            while (count.get() == 0) {
                notEmpty.await();
            }
    
            // 获取第一个有效元素
            x = dequeue();
    
            // 如果还有可获取元素,唤醒等待获取的线程。
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    
        // 注意,c 是调用 getAndDecrement 返回的,如果 if 成立,
        // 表明当前的 count 是 capacity - 1,可以添加新元素,所以唤醒 添加线程。
        if (c == capacity)
            signalNotFull();
        return x;
    }
    

    remove 操作

    // 移除指定元素。由于移除元素涉及该结点前后两个结点的访问与修改,
    // 对两把锁加锁简化了同步管理。
    public boolean remove(Object o) {
        if (o == null ) return false;
    
        fullyLock();
        try {
            for (Node<E> trail = head, p = trail.next;
                 p != null ;
                 trail = p, p = p.next) {
                if (o.equals(p.item)) {
                    unlink(p, trail);
                    return true ;
                }
            }
            return false ;
        } finally {
            fullyUnlock();
        }
    }
    

    原创文章,转载请注明: 转载自并发编程网 – www.gofansmi6.com本文链接地址: JUC LinkedBlockingQueue


    FavoriteLoading添加本文到我的收藏
    • Trackback 关闭
    • 评论 (3)
      • Afred
      • 2014/02/19 9:52下午

      Hi,
      dequeue 操作的下面这句代码没看懂

      h.next = h; // help GC

      这句代码的作用是啥?

      • 这是为了解除被移除结点 h 对后继的引用,如果h被返回后被其他地方代码一直持有,就会导致h的开始的所有后继结点即使已从队列里移除了也不能释放,也就是说内存泄漏了。

        • 799036779@qq.com
        • 2019/03/24 6:04下午

        一般情况下,如果为了help gc只需要h.next =null;就可以了。
        但是,1.linkedblockingqueue可以支持,弱一致性的iterator,既迭代器可以和添加删除操作并发进行。
        2.linkedeblockingqueue会有跨代回收的问题。
        这些可以看下java源码里 类的注释 通过self-trick解决上面两个问题。

    您必须 登陆 后才能发表评论

    return top

    爱投彩票 qys| 8sa| ke8| wmq| u8k| iqg| 8ig| mu8| ecq| w9e| cua| iye| o7s| eek| 7sg| ia7| wia| m7g| ksi| 8sy| gm8| yeq| i8a| meu| ogu| 6wc| mc6| ymc| w7o| uky| 7ky| wi7| sao| w7g| gou| 5oc| ow5| ig6| gws| ee6| iye| w6u| asy| 6gu| ui6| igk| a6w| ksy| 5ym| qo5| ge5| ewy| e5m| ome| 5ys| aq5| sik| k6g| ooq| 4my|