博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Java集合,ConcurrentLinkedQueue源码解析(常用于并发编程)
阅读量:6032 次
发布时间:2019-06-20

本文共 9109 字,大约阅读时间需要 30 分钟。

  hot3.png

本文中多次提到CAS算法,先做个CAS算法的简单描述

CAS(非阻塞算法)说明

CAS有3个操作数,内存值V,旧的预期值A,要修改的新值B。当且仅当预期值A和内存值V相同时,将内存值V修改为B,否则什么都不做。 

ConcurrentLinkedQueue是一种线程安全的队列。他是使用非阻塞算法(CAS)来实现线程安全的。ConcurrentLinkedQueue是一个基于链接节点的无界线程安全队列,它采用先进先出的规则对节点进行排序,当我们添加一个元素的时候,它会添加到队列的尾部;当我们获取一个元素时,它会返回队列头部的元素。

ConcurrentLinkedQueue不变式

在后面的源代码分析中,我们将会看到队列有时会处于不一致状态。为此,ConcurrentLinkedQueue 使用三个不变式 ( 基本不变式,head 的不变式和 tail 的不变式 ),来约束队列中方法的执行。通过这三个不变式来维护非阻塞算法的正确性

基本不变式

在执行方法之前和之后,队列必须要保持的不变式: 

当入队插入新节点之后,队列中有一个 next 域为 null 的(最后)节点。 
从 head 开始遍历队列,可以访问所有 item 域不为 null 的节点。

head 的不变式和可变式

在执行方法之前和之后,head 必须保持的不变式: 

所有“活着”的节点(指未删除节点),都能从 head 通过调用 succ() 方法遍历可达。 
head 不能为 null。 
head 节点的 next 域不能引用到自身。 
在执行方法之前和之后,head 的可变式: 
head 节点的 item 域可能为 null,也可能不为 null。 
允许 tail 滞后(lag behind)于 head,也就是说:从 head 开始遍历队列,不一定能到达 tail。

tail 的不变式和可变式

在执行方法之前和之后,tail 必须保持的不变式: 

通过 tail 调用 succ() 方法,最后节点总是可达的。 
tail 不能为 null。 
在执行方法之前和之后,tail 的可变式: 
tail 节点的 item 域可能为 null,也可能不为 null。 
允许 tail 滞后于 head,也就是说:从 head 开始遍历队列,不一定能到达 tail。 
tail 节点的 next 域可以引用到自身。 
在接下来的源代码分析中,在初始化 ConcurrentLinkedQueue 之后及调用入队 / 出队方法之前和之后,我们都会参照上面三个不变式来分析它们的正确性。

数据结构

public class ConcurrentLinkedQueue
extends AbstractQueue
implements Queue
, java.io.Serializable

ConcurrentLinkedQueue由head节点和tail节点组成,每个节点(Node)由节点元素(item)和指向下一个节点(next)的引用组成,节点与节点之间就是通过这个next关联起来,从而组成一张链表结构的队列。默认情况下head节点存储的元素为空,tail节点等于head节点。

节点类实现及队列初始化

节点类定义

ConcurrentLinkedQueue 是用节点链接成的链表来实现的。首先,让我们来看看节点类的源代码

private static class Node
{ private volatile E item; // 声明为 volatile 型 private volatile Node
next; // 声明为 volatile 型 Node(E item) { // 创建新节点 lazySetItem(item); // 惰性设置 item 域的值 } E getItem() { return item; } boolean casItem(E cmp, E val) { // 使用 CAS 指令设置 item 域的值 return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } void setItem(E val) { // 使用“volatile 写”的方式,设置 item 域的值 item = val; } voidlazySetItem(E val) { //惰性设置 item 域的值 UNSAFE.putOrderedObject(this, itemOffset, val); } void lazySetNext(Node
val) { // 惰性设置 next 域的值 UNSAFE.putOrderedObject(this, nextOffset, val); } Node
getNext() { return next; } boolean casNext(Node
cmp, Node
val) { //CAS 设置 next 域的值 return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } private static final sun.misc.Unsafe UNSAFE= // 域更新器sun.misc.Unsafe.getUnsafe(); private static final long nextOffset= //next 域的偏移量objectFieldOffset(UNSAFE, "next", Node.class); private static final long itemOffset= //item 域的偏移量objectFieldOffset(UNSAFE, "item", Node.class); }

在 ConcurrentLinkedQueue 的实际应用中,会频繁分配大量生命周期短暂的节点对象。为了降低开销,Node 类的 item 域和 next 域被声明为普通的 volatile 类型。它们通过原子引用域更新器(AtomicReferenceFieldUpdater),使用反射来更新

节点类型说明

有效节点:从 head 向后遍历可达的节点当中,item 域不为 null 的节点。 

无效节点:从 head 向后遍历可达的节点当中,item 域为 null 的节点。 
以删除节点:从 head 向后遍历不可达的节点。 
哨兵节点:链接到自身的节点(哨兵节点同时也是以删除节点)。 
头节点:队列中的第一个有效节点(如果有的话)。 
尾节点:队列中 next 域为 null 的节点(可以是无效节点)。 
如下图所示: 

120750_eTlk_2927759.png

对比 head 的不变式和 tail 的不变式可以看出,head 只能指向有效节点和无效节点,而 tail 可以指向任意节点,包括以删除节点和哨兵节点。在 ConcurrentLinkedQueue 中,入队时只能把新节点链接到尾节点的后面,出队时只能删除头节点

入队列

入队列过程

入队列就是将入队节点添加到队列的尾部,在 ConcurrentLinkedQueue 中,插入新节点时,不用考虑尾节点是否为有效节点,直接把新节点插入到尾节点的后面即可。由于 tail 可以指向任意节点,所以入队时必须先通过 tail 找到尾节点,然后才能执行插入操作。如果插入不成功(说明其他线程已经抢先插入了一个新的节点)就继续向后推进。重复上述迭代过程,直到插入成功为止。 

下面让我们看看入队的源码

public boolean offer(E e) {        if (e == null)            throw new NullPointerException();        // 入队前创建一个入队新节点        Node
n = new Node
(e); // 创建新节点 // 死循环,入队不成功反复入队。 retry: for (;;) { // 创建一个指向tail节点的引用 Node
t = tail; // p用来表示队列的尾节点,默认情况下等于tail节点 Node
p = t; for (int hops = 0;; hops++) { // 找到 tail 的下一个节点 next Node
next = succ(p); // next节点不为空,说明p不是尾节点,需要更新p后在将它指向next节点 if (next != null) { // 如果已经至少越过了两个节点,且 tail 被修改 (tail 被修改,说明其他线程向队列添加了新的节点,且更新 tail 成功 ), // 并且当前节点还是不等于尾节点 if (hops > HOPS && t != tail) // 跳出内外两层循环,重新开始迭代(因为 tail 刚刚被其他线程更新了) continue retry; // B2 // 向后推进到下一个节点 p = next; // B3 // 如果p是尾节点,则设置p节点的next节点为入队节点 } else if (p.casNext(null, n)) { // C // 如果tail节点有大于等于1个next节点,则将入队节点设置成tail节点, // 更新失败了也没关系,因为失败了表示有其他线程成功更新了tail节点 if (hops >= HOPS) // C1 // 使用 CAS 原子指令更新 tail 指向这个新插入的节点,允许失败 casTail(t, n); // C2 return true; // C3 } else {// p有next节点,表示p的next节点是尾节点,则重新设置p节点 p = succ(p); // D1 } } } }

从源代码角度来看,整个入队过程主要做两件事情:第一是定位出尾节点;第二是使用CAS算法将入队节点设置成尾节点的next节点,如不成功则重试

定位尾节点

tail节点并不总是尾节点,所以每次入队都必须先通过tail节点来找到尾节点。尾节点可能是tail节点,也可能是tail节点的next节点。代码中循环体中的第一个if就是判断tail是否有next节点,有则表示next节点可能是尾节点。获取tail节点的next节点需要注意的是p节点等于p的next节点的情况,只有一种可能就是p节点和p的next节点都等于空,表示这个队列刚初始化,正准备添加节点,所以需要返回head节点 

获取p节点的next节点源码如下:

/**     * Returns the successor of p, or the first node if p.next has been     * linked to self, which will only be true if traversing with a     * stale pointer that is now off the list.     */    final Node
succ(Node
p) { // TODO: should we skip deleted nodes here? Node
q = p.next; return (p == q) ? first() : q; }

下面针对tail可能出现的情况分别通过图示说明 

1、tail指向尾节点 

120047_UOcH_2927759.png

开始时,tail 指向 D 节点,首先寻找 D 节点的后继节点。由于 D 的后继节点为 null,所以插入新节点到 D 节点的后面。如果插入成功就退出方法;如果插入失败(说明其他线程刚刚插入了一个新节点),就向后推进到新插入的节点,然后重新开始迭代。下图是插入成功后的示意图: 

120111_HJbR_2927759.png

在上图中,由于 tail 滞后于尾节点的节点数还没有达到 HOPS 指定的阈值,所以 tail 没有被更新。 

2、tail指向非尾节点 

120141_iGhe_2927759.png

开始时,tail 指向 C 节点。首先找到 C 的后继节点 D,然后向后推进到节点 D,后面代码执行路径与上面的“tail 指向尾节点 ”的代码执行路径相同。下图是插入成功后的结构示意图:

120212_XVg8_2927759.png

上图中的 tail 更新了位置。因为在添加 E 节点后,tail 滞后的节点数达到了 HOPS 指定的阈值。这触发执行更新 tail 的 CAS 操作。

private static final int HOPS = 1;

3、tail滞后于head 

120258_zIeF_2927759.png

开始时,tail 指向 A 节点。首先找到 A 的后继节点 B,然后向后推进到节点 B。由于 B 是哨兵节点,产生跳转动作,跳过 C 节点,从 head 指向的 D 节点开始继续向后遍历。后面的代码执行路径与“tail 指向非尾节点”相同。下面是成功插入一个新节点后的结构示意图 

120350_pZbB_2927759.png

出队列

出队列的就是从队列里返回一个节点元素,并清空该节点对元素的引用。并不是每次出队时都更新head节点,当head节点里有元素时,直接弹出head节点里的元素,而不会更新head节点。只有当head节点里没有元素时,出队操作才会更新head节点。这种做法也是通过hops变量来减少使用CAS更新head节点的消耗,从而提高出队效率。 

下面我看下出队的源码实现:

public E poll() {         Node
h = head; // p表示头节点,需要出队的节点 Node
p = h; for (int hops = 0; ; hops++) { // 获取p节点的元素 E item = p.getItem(); // 如果p节点的元素不为空,使用CAS设置p节点引用的元素为null if (item != null && p.casItem(item, null)) { // 如果迭代过程已经越过了不小于 1 个节点也就是 if (hops >= HOPS) { // 将p节点下一个节点设置成head节点 Node
q = p.getNext(); // 如果 q 不为 null,设置 head 指向后继节点 q;否则设置 head 指向当前节点 p(此时队列为空,只有一个伪节点 p) updateHead(h, (q != null) ? q : p); } // 返回被移除节点 item 域的值 return item; } // 如果头节点的元素为空或头节点发生了变化,这说明头节点已经被另外 // 一个线程修改了。那么获取p节点的下一个节点 Node
next = succ(p); // 如果p的下一个节点也为空,说明这个队列已经空了 if (next == null) { // 设置 head 指向 p 节点(此时队列为空,只有一个伪节点 p) updateHead(h, p); // 退出循环 break; } // 如果下一个元素不为空,则将头节点的下一个节点设置成头节点 p = next; } return null; }

首先获取头节点的元素,然后判断头节点元素是否为空,如果为空,表示另外一个线程已经进行了一次出队操作将该节点的元素取走,如果不为空,则使用CAS的方式将头节点的引用设置成null,如果CAS成功,则直接返回头节点的元素,如果不成功,表示另外一个线程已经进行了一次出队操作更新了head节点,导致元素发生了变化,需要重新获取头节点。

头节点定位

根据 head 的不变式和可变式,在执行出队操作前,head 在队列中的位置共有两种可能: 

head 指向有效节点。 
head 指向无效节点。 
1、head指向有效节点 

120454_j0qe_2927759.png

出队时,首先取得 head 指向的 A 节点的 item 域的值,然后通过 CAS 设置 A 节点 item 域的值为 null。如果成功,由于此时越过的节点数为 0,所以直接返回 A 节点 item 域原有的值。如果不成功,说明其他线程已经抢先删除了该节点,此时向后推进到 B 节点。重复这个过程,直到成功删除一个节点;如果遍历完队列也没有删除成功,则返回 null。下面是成功删除后的结构示意图: 

120520_VpSn_2927759.png

在上图中,虽然 A 节点被设置成无效节点,但 head 依然指向它,因为删除操作越过的节点数还没有达到 HOPS 指定的阀值。 

接下来,让我们来看看第二种情形的结构示意图 
2、head指向无效节点 

120549_iflm_2927759.png

首先获得 head 指向节点的 item 域的值,由于为 null,所以向后推进到 B 节点。获得 B 节点 item 域的值后,通过 CAS 设置该值为 null。如果成功,由于已经达到 HOPS 指定的阀值,触发执行 head 更新。如果不成功(说明其他线程已经抢先删除了 B 节点),继续向后推进到 C 节点。重复这个过程,直到删除一个有效节点。如果遍历完队列也没有删除成功,则返回 null。下图是成功删除后的结构示意图: 

120614_jvNH_2927759.png

 

从上图我们可以看到,在执行删除操作过程中,head 越过的节点数达到阀值,触发执行 head 的更新,使它指向 C 节点。 

从上面删除头节点后的两个结构示意图可以看出,执行出队操作后的队列依然满足三个不变式。

总结

ConcurrentLinkedQueue 的非阻塞算法实现非常精巧,也非常复杂。它使用 CAS 原子指令来处理对数据的并发访问。同时,它允许队列处于不一致状态。这个特性分离了入队 / 出队操作中包含的两个需要一起原子执行的步骤,从而有效的缩小了入队 / 出队时的原子化(更新值的)范围为唯一变量。由于队列可能处于不一致状态,为此 ConcurrentLinkedQueue 使用三个不变式来维护非阻塞算法的正确性。 

参考书籍: 

《Java并发编程》、JDK源码

 

转载于:https://my.oschina.net/90888/blog/1624935

你可能感兴趣的文章
如何高效学习和工作(撸代码)
查看>>
驻波比
查看>>
探讨神奇的需求变更:程序员遭遇车祸成植物人,需求变更将其唤醒
查看>>
Basic INFO - InstallShield制作的安装包如何进行平台过滤
查看>>
oc 中数据包装nsdate nsvalue nsnuber的用法
查看>>
一些优秀软件收藏
查看>>
LoadRunner监控Linux服务器
查看>>
ASP.NET MVC案例教程(基于ASP.NET MVC beta)——第六篇:拦截器
查看>>
rc.local自启动学习
查看>>
EFCodeFirst系列
查看>>
eclipse开启和去掉代码上面的快速导航栏(Toggle Breadcrumb)的方法
查看>>
javascript中的命名规则和方法(转)
查看>>
常用正则表达式
查看>>
nullnullAndroid Interface Definition Language (AIDL) 接口描述语言
查看>>
使你更有思想的20本书
查看>>
java jni 编程
查看>>
Android项目 手机安全卫士(代码最全,注释最详细)之七 应用程序的更新安装...
查看>>
paip.输出内容替换在Apache 过滤器filter的设置
查看>>
hdu 1009:FatMouse' Trade(贪心)
查看>>
蓝桥杯 入门训练 Fibonacci数列(水题,斐波那契数列)
查看>>