<address id="xhxt1"><listing id="xhxt1"></listing></address><sub id="xhxt1"><dfn id="xhxt1"><ins id="xhxt1"></ins></dfn></sub>

    <thead id="xhxt1"><dfn id="xhxt1"><ins id="xhxt1"></ins></dfn></thead>

    告别Disruptor(一) 简洁优雅的高性能并发队列

    几年前听说过Disruptor,一直没用过也没深究, 其号称是一个性能爆表的并发队列,上Github/LMAX-Exchange/disruptor 去看了看,官方性能描述文章 选了慢如蜗牛的ArrayBlockQueue来对比。在Nehalem 2.8Ghz – Windows 7 SP1 64-bit录得性能见后(其中P,C分别代表 Producer和Consumer):

    1P – 1C 的吞吐量两千五百万次,1P – 3C Multicast 就降到了一千万次不到,对比我所认为的非线程安全1P -1C队列亿次每秒的量级,感觉并不强大。亿次每秒的队列加上线程安全,毛估估1P-1C性能减半五千万次每秒,1P-3C 再减少个30%三千五百万次每秒,应该差不多了吧。

    继续读读Disruptor的介绍,整体业务框架先不谈,关于队列的部分,我只想问可以说脏话不,太TMD的复杂了,实现方式一点都不优雅,讲真,我想用100行代码灭了它。

    说干就干,先尝试一下。

    ArrayBlockingQueueDisruptor
    Unicast: 1P – 1C5,339,25625,998,336
    Pipeline: 1P – 3C2,128,91816,806,157
    Sequencer: 3P – 1C5,539,53113,403,268
    Multicast: 1P – 3C1,077,3849,377,871
    Diamond: 1P – 3C2,113,94116,143,613

    Comparative throughput (in ops per sec)

    第一步 简单粗暴的阻塞队列

    先简单来个,这年头用synchronized要被人瞧不起的(风闻现在性能好多了),大家言必谈ReentrantLock,CAS,那么我也赶潮流CAS吧,主流程如下。

    步骤主要工作失败流程
    1无论是put还是take,先对head/tail getAndIncrease 在Array中占位100%成功
    2检查是否能够放/取失败则循环检查,条件允许了再操作(不能返回失败,第1步的占位无法回退)
    3执行放/取操作

    第2步的操作,如果失败,并非完全不能回退,只是需要牵扯到一套复杂的逻辑,在这个简单粗暴的实现中,先不考虑非阻塞方案。

    核心代码如下:

    private Object[] array;
    private final int capacity;
    private final int m;
    
    private final AtomicLong tail;
    private final AtomicLong head;
    private final AtomicLong[] als = new AtomicLong[11];
    
    public RiskBlockingQueue(int preferCapacity) {
    	this.capacity = getPow2Value(preferCapacity, MIN_CAPACITY, MAX_CAPACITY);
    	//这是个取比preferCapacity大的,最接近2的整数幂的函数(限制必须在 MIN MAX 之间)
    	array = new Object[this.capacity];
    	this.m = this.capacity - 1;		
    
    	for (int i = 0; i < als.length; i++) {     // 并不一定100%成功的伪共享padding
    		als[i] = new AtomicLong(0);
    	}
    	head = als[3];
    	tail = als[7];
    }
    
    public void put(T obj) {
    	ProgramError.when(obj == null, "Can't add null into this queue");
    	int p = (int) (head.getAndIncrement() & this.m);
    	int packTime = MIN_PACKTIME_NS;
    	while(array[p] != null) {
    		LockSupport.parkNanos(packTime);
    		if(packTime < MAX_PACKTIME_NS) packTime <<= 1;
    	}
    	array[p] = obj;
    }
    
    public T take(){
    	Object r;
    	int p = (int) (tail.getAndIncrement() & this.m);
    	int packTime = MIN_PACKTIME_NS;
    	while((r = array[p]) == null) {
    		LockSupport.parkNanos(packTime);
    		if(packTime < MAX_PACKTIME_NS) packTime <<= 1;
    	}
    	array[p] = null;
    	return (T)r;
    }

    代码简单的不要不要的,30来行代码,一个线程安全的阻塞就基本完成。什么?你问构造函数为什么叫RiskBlockingQueue?,很简单,有Risk,这并不是一个真正意义上的线程安全Queue,它有风险,那么风险在哪里呢?

    各位看官先自己想想风险在哪里,我先来测个1P-1C 性能数据 (以下数据都在关闭了CPU超线程的环境下测试获得,超线程时数据经??瓷先ズ苊?

    1P – 1C

    Producer 0 has completed. Cost Per Put 19ns.
    Consumer 0 has completed. Cost Per Take 19ns.
    Total 201M I/O, cost 3844ms, 52374243/s(52M/s)

    ??,还真的和预测差不多,性能减半。

    接下来,揭晓这个队列的风险,我们再来看看队列中一个P线程(Producer 线程,Consumer称为C线程,下同)put操作的工作流程

    步骤主要工作
    1getAndIncrease 占位
    2检查是否能够put
    3执行put操作

    假设某线程P0,执行完了第1步,在执行第2或3步时被叫去喝茶了

    这时P0线程本应填充位置array[x]但却没有填充(如果有对应的C线程,也take不到对象,被卡在array[x]上不断pack)。

    但线程P1 – Pn在欢快的继续执行,不断的put,沿着array往前跑,渐渐的,一圈过去了,P1 线程也来到了array[x]的位置。

    这时,P0线程和P1线程对array[x]位置的访问处于竞争状态,array[x] 没有任何锁/同步/信号量/原子操作?;?。这可能会造成对象丢失,并卡住一个C线程,并最终卡住整个队列。C线程们也一样,极端情况下,当一个C线程追上另一个C线程的时候,也会对数组的同一位置发生非线程安全的争用。

    哪里有问题,就解决哪里的问题

    第二步 真正的并发阻塞Array队列

    对于数组,Java没有提供直接的CAS操作方式(除非自己调Unsafe),不但没有CAS,连volatile都没有。不过,Java中有一个内置的类,叫AtomicReferenceArray,简而言之,这个类提供了对 T[] 数组的CAS及类似操作。继续上代码,这次还少了2行。

    private AtomicReferenceArray<T> array;  //原本是 Object[] array
    private final int capacity;
    private final int m;
    private final AtomicLong tail;
    private final AtomicLong head;
    
    public ConcurrentBlockingQueue(int preferCapacity) {
    	this.capacity = getPow2Value(preferCapacity, MIN_CAPACITY, MAX_CAPACITY);
    	array = new AtomicReferenceArray<T>(this.capacity);
    	this.m = this.capacity - 1;		
    
    	for (int i = 0; i < als.length; i++) {
    		als[i] = new AtomicLong(0);
    	}
    	head = als[3];
    	tail = als[7];
    }
    
    public void put(T obj) {
    	ProgramError.when(obj == null, "Queue object can't be null");
    	int p = (int) (head.getAndIncrement() & this.m);
    	int packTime = MIN_PACKTIME_NS;
    	while(!array.compareAndSet(p, null, obj)) { //***
    		LockSupport.parkNanos(packTime);
    		if(packTime < MAX_PACKTIME_NS) packTime <<= 1;
    	}
    }
    
    public T take(){
    	T r;
    	int p = (int) (tail.getAndIncrement() & this.m);
    	int packTime = MIN_PACKTIME_NS;
    	while((r=array.getAndSet(p, null)) == null) {  //***
    		LockSupport.parkNanos(packTime);
    		if(packTime < MAX_PACKTIME_NS) packTime <<= 1;
    	}
    	return r;
    }

    这段代码的核心修改是,每次读写array的位置p,都通过CAS和GAS的原子操作来完成,保证了array的线程安全,在高度争用时,依然可以确保放一个取一个的交替。 接下来继续测试性能。

    1P – 1C

    Consumer 0 has completed. Cost Per Take 29ns.
    Producer 0 has completed. Cost Per Add 29ns.
    Total 201M I/O, cost 5912ms, 34053889/s(34M/s)

    1P – 3C

    Consumer 0 has completed. Cost Per Take 143ns.
    Consumer 2 has completed. Cost Per Take 143ns.
    Producer 0 has completed. Cost Per Put 47ns.
    Consumer 1 has completed. Cost Per Take 143ns.
    Total 201M I/O, cost 9665ms, 20830480/s (20M/s)

    3P – 1C

    Producer 0 has completed. Cost Per Put 185ns.
    Producer 1 has completed. Cost Per Put 185ns.
    Producer 2 has completed. Cost Per Put 185ns.
    Consumer 0 has completed. Cost Per Take 62ns.
    Total 201M I/O, cost 12551ms, 16040681/s(16M/s)

    2P – 2C

    Producer 0 has completed. Cost Per Put 147ns.
    Producer 1 has completed. Cost Per Put 148ns.
    Consumer 0 has completed. Cost Per Take 148ns.
    Consumer 1 has completed. Cost Per Take 148ns.
    Total 201M I/O, cost 14986ms, 13434311/s(13M/s)

    感觉有点沮丧,可能是因为增加的CAS操作,性能进一步下降。

    考虑到我的测试环境里CPU比Disruptor当年的CPU快不少,实际可比性能,1P-1C估计只有Disruptor的1.1倍,1P – 3C的性能,只有其1.5倍???,但是快的及其有限,号称要挑战Disruptor,却仅凭点数小胜,而非直接击倒,不爽。

    具体的性能对比如下:

    Disruptor
    Nehalem 2.8Ghz
    ConcurrentBlockingQueue
    Skylake-X 3.3Ghz
    Unicast: 1P – 1C25,998,33634,053,889
    Pipeline: 1P – 3C16,806,157
    Sequencer: 3P – 1C13,403,268
    Multicast: 3P – 1C16,040,681
    Multicast: 1P – 3C9,377,87120,830,480
    Diamond: 1P – 3C16,143,613
    Multicast: 2P – 2C13,434,311

    问题出在哪里呢?仔细想了一下,可能是过多的变量共享访问及数组的共享访问造成了大量的 L1/2 缓存失效,描述如下。

    1P – 1C时,一旦head和tail靠近或套圈,那么对64字节的数组内存就直接形成了共享争用。

    在指针压缩时,Java的每个引用占用4字节,64字节一条的Cache line 一共有16个引用,两个线程一个放一个取,不断的 L1/2 缓存失效,锁定,修改。

    1P – 3C的时候,还要再加上C与C之间的 tail 争用和 L1/2 缓存争用,然后,性能就下降到这么个结果了。

    还是那句话,哪里有问题,就解决哪里的问题

    但是怎么改呢,想了下:
    1P1C时,一旦队列空了或是满了,很容易陷在Producer/Consumer一个放一个取的过程中,而多P或多C时,多个P/C线程,排着队一个个自增head/tail然后修改共享数组,P线程们争完head之后,集中在共享数组的相邻位置尝试放置对象;C线程们争用tail,再修改共享数组的相邻位置,几乎每次访问都是无效 L1/2 缓存。
    也就是说,在密集争用时,后一个线程正好争得前一个线程取得的位置的后一个的位置(head/tail + 1),然后这两个线程对位于同一个cache line上的相邻的数组位置进行访问,造成缓存失效,性能下降,这是流程上的硬伤,没法改,只能推倒重来。

    3 非阻塞线程安全Array队列

    上一个队列,不但性能不尽人意,而且很难支持非阻塞操作,换个思路,重新设计一个支持非阻塞操作的队列,并尽可能少共享访问内存的情况。流程如下:

    步骤主要工作失败流程
    1检查可能能够放/取的数组位置(读取竞争变量head或tail)不会失败
    2检查是否能够放/取(对应位置是否有对象,反映了队列是否空/满)回步骤1/或返回失败
    3getAndIncrease竞争变量head或tail,竞争该位置的操作权(放/取)回步骤1/或返回失败
    4执行放/取操作失败就循环重试

    先以offer为例看看代码:

    private AtomicReferenceArray<T> array;
    
    public boolean offer(T obj) {
    	if(obj == null) throw new NullPointExceprion("Can't put null object into this queue");
    	long head = this.head.get(); //步骤1
    	int p =(int) (head & this.m);
    	if(array.get(p) != null) return false;   //步骤2
    	if(this.head.compareAndSet(head, head + 1)) {  //步骤3
    		int packTime = MIN_PACKTIME_NS;
    		while(!array.compareAndSet(p, null, obj)) { //步骤4
    			LockSupport.parkNanos(MIN_PACKTIME_NS);
    			if(packTime < MAX_PACKTIME_NS) packTime <<= 1;
    		};
    		return true;
    	}
    	return false;
    }

    这里我们要注意,在offer的第2步,就算if判断时,array.get(p) == null 可以放置对象,但这并不说明,在第4步,位置p仍然为空,因为可能有上一圈甚至再上一圈,不知道为什么停在这里的P线程往array的p位置放置了对象(虽然概率很小)。此时要等到一个C线程将这个对象取出之后,该位置才能被继续放置对象,如果没有C线程来取,将一直等在这里。

    由于此时head已经被getAndIncrease,这又是一个已经占位,无法轻易回退的地方。我们也不可能将array.compareAndSet和head.getAndIncrease两个原子操作,合并成一个原子操作。

    采用不可回退,反复循环尝试的方式,代码虽然工作正常,但存在block较长时间的可能,甚至,在极其极端的情况下(多队列,多线程,复杂流水线且存在特定的处理循环),还有可能引起死锁。

    难道又改成一个不支持非阻塞操作的队列?心有不甘!最好有一个不怎么影响整体性能的方案来实现这里的回退。

    此类的并发冲突是个小概率事件,需要回退的比例很低,回退部分可以性能较差,但正常处理时的性能要尽量不受影响。

    高性能 = 减少真共享 + 消除伪共享 + 降低争用 + …. 但是,怎么写呢? 干到这里时正好是中午,下楼吃饭时边吃边想,然后,在开心的喝着牛肉汤时更开心的把这个问题想通了。 代码如下:

    private final byte[] falseOffer;   //新增
    private final byte[] falsePoll;  //新增
    private final AtomicReferenceArray<T> array;
    final int capacity;
    final int m;
    final AtomicLong tail;  
    final AtomicLong head;
    
    public ConcurrentQueue(int preferCapacity) {
    	this.capacity = ComUtils.getPow2Value(preferCapacity, MIN_CAPACITY, MAX_CAPACITY);
    	array = new AtomicReferenceArray<T>(this.capacity);
    	falseAdd = new byte[this.capacity];
    	falsePoll = new byte[this.capacity];
    	this.m = this.capacity - 1;
    
    	for (int i = 0; i < als.length; i++) {
    		als[i] = new AtomicLong(0);
    	}
    	head = als[3];
    	tail = als[7];
    }
    
    public boolean offer(T obj) {
    	if(obj == null) throw new NullPointExceprion("Can't put null object into this queue");
    	while(true) {
    		long head = this.head.get();
    		int p =(int) (head & this.m);
    		if(falsePoll[p] > 0) {
    			synchronized(falsePoll) {  //运行比例很低,性能要求不高,直接同步处理
    				if(falsePoll[p] > 0) {  //如果不满足条件,说明失效计数已被其他线程处理,break; 回到最初重新尝试offer
    					if(this.head.compareAndSet(head, head + 1)){ //如果不满足条件,说明位置P已经失效,回到最初重新尝试offer
    						falsePoll[p] --; //跳过一次存在poll失效计数的位置p, poll失效计数 - 1,回到最初重新尝试offer
    					}
    				}
    			}
    			break;
    		}
    		if(array.get(p) != null) return false;
    		if(this.head.compareAndSet(head, head + 1)) {
    			for(int i = 0; i < INTERNAL_PACK_COUNT; i ++) {
    				if(!array.compareAndSet(p, null, obj)) {
    					LockSupport.parkNanos(2 << i);
    				} else return true;
    			}
    			synchronized(falseOffer) {  //运行比例很低,性能要求不高,直接同步处理
    				falseOffer[p] ++;  //位置p的add失效计数器
    			}
    		}
    		return false;
    	}
    	return false;
    }
    
    
    public T poll(){
    	while(true) {
    		T r;
    		long tail = this.tail.get();
    		int p = (int) (tail & this.m);
    		if(falseOffer[p] > 0) {
    			synchronized(falseOffer) {
    				if(this.tail.compareAndSet(tail, tail + 1)) {
    					falseOffer[p]--;
    				}
    			}
    			break;
    		}
    		r = array.get(p);
    		if(r == null) return null;
    		if(this.tail.compareAndSet(tail, tail + 1)) {
    			for(int i = 0; i < INTERNAL_PACK_COUNT; i ++) {
    				if((r = array.getAndSet(p, null)) == null){
    					LockSupport.parkNanos(2 << i);
    				} else return r;
    			}
    			synchronized(falsePoll) {
    				falsePoll[p] ++;
    			}
    		}
    		return null;
    	}
    	return null;
    }

    新增了两个和array等长的byte数组falseOffer[] 和 falsePoll[]作为失败回退计数器,如果在前文提到的offer/ poll 过程中,发生了array的p位置的 CAS或GAS失败,并且无法通过重试少量次数迅速成功,那么将失败回退计数器 falseOffer[p] 或是 falsePoll[p] +1 。

    正常流程中,每次offer / poll 时,先读取falsePoll[p] / falseOffer[p],如果poll 读到的 falseOffer[p] 大于0,说明位置p发生过应该offer却未能成功offer的回退,poll 操作应该忽略位置p一次,此时C线程同步锁定,检查,尝试自增tail,将falseOffer[p] –,然后继续尝试下一个位置。这一连串的过程,是个小概率事件,简单的同步锁就好了,无需过多考虑性能。

    同时,因为回退是小概率事件,所以falseOffer[] 和 falsePoll[]数组很少被修改,所有的对这两个失败回退计数数组的读取,大部分时间都处于 L1/2 缓存有效状态,平均访问耗时应该在1ns左右,性能影响很小。

    再看看这个解决方案的安全性,虽然失败回退是个小概率事件,但数组byte会不会溢出?会不会同一个位置,累计超过127次失败?测试下吧。

    测试结果如下:

    队列长度线程数观察到的byte计数最大值
    2256P – 256C9
    4512P-512C2
    81024P-1024C1

    在将队列长度设置为最小值2,几百个线程操作的时候,观察到了byte数组中有最高9的计数,当队列长度设置到8时,千余线程,经短时运行测试,没有观察到过大于1的计数。而在实际应用中,最小队列长度会限制为1024或更大(高性能服务器弄个很小的队列,没啥意义),这个byte数组的溢出概率极极极极极极极小。如果想省点空间,这个byte数组应该还可以进一步优化,用4个bit来计数就够了。

    高性能 = 减少真共享 + 消除伪共享 + 降低争用 + ….

    还有降低争用一招没用,什么是关键竞争变量,head? tail? array? P线程竞争head,然后竞争array,C线程竞争tail 然后竞争array,当队列长度居中时,array(连续16个引用)就比head/tail竞争更激烈,而当队列满/空时,array的争用压力还需要再相加一下,在大吞吐量,多线程竞争一个资源失败时,如果大家都很激进的重复竞争,将导致这些争用和共享资源反复处于缓存失效状态,降低性能。因此,当某个offer/poll操作失败时,失败的线程需要等待的稍微久一点,再尝试下一次,而不是简单粗暴的packNano(1),当队列一直空,或是满的时候,相关线程更不应该反复循环,应该等久一点,然后重试。这部分就不专门贴代码了,有兴趣可以直接去github上拉源码看。

    在这个过程中,还有一些其他回退检测流程上的小坑也被自然填平了,不再多说。核心要点上面已经全部列出来了。

    老样子,实践是检验真理的唯一标准,继续跑分:

    1P – 1C

    Producer 0 has completed. Cost Per Put 14ns.
    Consumer 0 has completed. Cost Per Take 14ns.
    Total 440M I/O, cost 6282ms, 70,105,367/s, 70.11M/s

    1P – 3C

    Consumer 2 has completed. Cost Per Take 19ns.
    Consumer 1 has completed. Cost Per Take 36ns.
    Producer 0 has completed. Cost Per Put 14ns.
    Consumer 0 has completed. Cost Per Take 42ns.
    Total 440M I/O, cost 6256ms, 70,396,726/s, 70.40M/s

    3P – 1C

    Producer 0 has completed. Cost Per Put 23ns.
    Producer 1 has completed. Cost Per Put 38ns.
    Producer 2 has completed. Cost Per Put 44ns.
    Consumer 0 has completed. Cost Per Take 14ns.
    Total 440M I/O, cost 6519ms, 67,556,668/s, 67.56M/s

    2P – 2C

    Producer 1 has completed. Cost Per Put 14ns.
    Consumer 1 has completed. Cost Per Take 25ns.
    Producer 0 has completed. Cost Per Put 28ns.
    Consumer 0 has completed. Cost Per Take 28ns.
    Total 440M I/O, cost 6315ms, 69,739,021/s, 69.74M/s

    这下心满意足了,下个新版的Distruptor, 比较了一下。 在当前的Distruptor版本中,所有1P的测试,均使用的createSingleProducer创建的非线程安全的Producer,所以*部分,使用了一个非线程安全的队列进行性能比较。其余的1P-nC 的队列,暂无对应的比较对象,将在后续代码/文章中逐步添加。

    Disruptor(Old Ver)
    Nehalem 2.8Ghz
    Disruptor(V3.3)
    Skylake-X 3.3Ghz
    ConcurrentQueue
    Skylake-X 3.3Ghz
    ConcurrentBlockingQueue
    (本文第2节的队列)
    Skylake-X 3.3Ghz
    1P – 1C*134,952,766
    OneToOneSequencedThroughputTest
    *310,360,761
    SimpleBlockingQueue
    Thread-Safe 1P – 1C10,373,443
    RingBuffer.createMultiProducer
    70,105,36734,053,889
    Pipeline: 1P – 3C16,806,15722,128,789
    OneToThreePipelineSequencedThroughputTest
    Sequencer: 3P – 1C13,403,26811,344,299
    ThreeToOneSequencedThroughputTest
    67,556,66816,040,681
    Multicast: 1P – 3C9,377,871168,350,168
    OneToThreeSequencedThroughputTest
    Diamond: 1P – 3C16,143,61322,899,015
    OneToThreeDiamondSequencedThroughputTest
    2P – 2C4,273,504
    TwoToTwoWorkProcessorThroughputTest
    69,739,02113,434,311

    在与Disruptor的可比项之间的比较中,ConcurrentQueue线程安全队列,取得了远高于Disruptor的吞吐量,在多线程高并发争用的条件下实现了超过六千万次每秒的吞吐量。数倍于Disruptor

    这次算是K.O.了。

    这是终点吗?并不是,整条服务器业务处理流水线的大部分地方,通常并不需要真正的线程安全队列。而是更多的需要1P – 1C,或是n为确定数值的 nP – 1C这样的队列,后续将会参照1P – 1C的非线程安全队列 SimpleBlockingQuere,继续添加实现及介绍文章

    本文所涉及到的代码及测试代码,可在https://github.com/Lofint/tachyon 查看/下载

    原创文章,转载请注明: 转载自并发编程网 – www.gofansmi6.com本文链接地址: 告别Disruptor(一) 简洁优雅的高性能并发队列

    FavoriteLoading添加本文到我的收藏
    • Trackback 关闭
    • 评论 (12)
      • 海带
      • 2019/05/21 10:42下午

      勘误,第三部分头几段和表格中步骤3的 getAndIncrease 都应改为compareAndSet

      • whoyou223
      • 2019/06/13 4:30下午

      都是CAS,并没有本质区别,只是disruptor功能强大

      • 海带
      • 2019/06/13 7:44下午

      whoyou223 :
      都是CAS,并没有本质区别,只是disruptor功能强大

      CAS 确实不是关键差异,也不是本文的重点

      1 关键差异一,少用一个共享变量,可参考 http://www.gofansmi6.com/disruptor-writing-ringbuffer/ 里面有一句话 “Disruptor 由消费者负责通知(生产者Barrier)它们处理到了哪个序列号” 这一个共享变量的值变更,在多线程环境下会缓存行失效,需要通过 L3 Cache 读取,通常需要耗费50个时钟周期/12ns或更多的时间。这是造成性能差异的重要原因之一。

      2 我们通常提及的Disruptor的队列,是阻塞的。在流水线中,阻塞队列在应对单消费者线程处理多类和/或多生产者任务时,不得不面对附加的多线程和/或并发队列、任务类别判断等影响性能的问题。这一点可能需要些例子才方便讲清楚。我空了会继续写。

        • xc19891112
        • 2019/06/14 10:38上午

        的确 disruptor做的很强大 缓存填充避免false sharing、CAS、GC、以及事件驱动模型,disruptor 生产和消费都可做到锁无关 ,你是怎么使用disrupto进行测试的代码列出来

      • 海带
      • 2019/06/14 10:53上午

      xc19891112 :
      的确 disruptor做的很强大 缓存填充避免false sharing、CAS、GC、以及事件驱动模型,disruptor 生产和消费都可做到锁无关 ,你是怎么使用disrupto进行测试的代码列出来

      我做的Disruptor的测试都用其自带的测试代码,文中最后一个表格的第三列列出了所有测试代码的类名。CPU型号也已列出,其他环境: 32G DDR4 2133 / Ubuntu 1804 / JDK 1.8.171

      Disruptor自己文章中用的啥测试代码,我简单找了一下没找到明确说法。

      • 流逝的风
      • 2019/06/14 3:16下午

      SimpleBlockingQueue 和 disruptor的OneToOneSequencedThroughputTest 测试一下 没有达到disruptor的吞吐量 相差甚远并且disruptor虽然锁无关 但是有数据一致性 相比SimpleBlockingQueue无法保证

        • 海带
        • 2019/06/14 3:47下午

        这是我的测试结果

        disruptor: OneToOneSequencedThroughputTest

        Starting Disruptor tests
        Run 0, Disruptor=134,408,602 ops/sec BatchPercent=97.80% AverageBatchSize=45
        Run 1, Disruptor=53,590,568 ops/sec BatchPercent=85.72% AverageBatchSize=7
        Run 2, Disruptor=160,771,704 ops/sec BatchPercent=98.39% AverageBatchSize=61
        Run 3, Disruptor=134,770,889 ops/sec BatchPercent=97.18% AverageBatchSize=35
        Run 4, Disruptor=143,678,160 ops/sec BatchPercent=97.69% AverageBatchSize=43
        Run 5, Disruptor=179,533,213 ops/sec BatchPercent=99.07% AverageBatchSize=107
        Run 6, Disruptor=142,857,142 ops/sec BatchPercent=97.56% AverageBatchSize=41

        SimpleBlockingQueue: 1P – 1C
        Producer 0 has completed. Cost Per Put 2ns.
        Consumer 0 has completed. Cost Per Take 2ns.
        Total 440M I/O, cost 1304ms, 337,731,533/s, 337.73M/s

        这是个非线程安全队列(要NUM_CONSUMER = 1;否则会跑失败),但在OneToOneSequencedThroughputTest中Disruptor用的也是 ringBuffer = createSingleProducer(… … …) 而不是 createMultiProducer

        后面不带*的是线程安全比较,

        API方面,当然和Disruptor不是一个量级。但从服务端流水线处理的角度,我认为Disruptor的api有点重。

          • 流逝的风
          • 2019/06/14 3:53下午

          多说无益 研究透了再说灭他吧 谈不上告别

            • 海带
            • 2019/06/14 4:08下午

            不急,这才第一篇?;拐嬉嗨?。

      • 流逝的风
      • 2019/06/14 3:24下午

      你的代码 我也过了一下 实话打败disruptor还远 disruptor针对并发的细节都做了优化 api设计的也很精妙了

    1. 自信是好事,过了就不好了
      多的不说了,我就挑一个简单的说一下,能以文中的方式去使用LockSupport.parkNanos的程序员,不适合写工具类

        • 海带
        • 2019/07/17 7:06下午

        这点你说的很对,parkNanos确实不该这样用。多谢指出

    您必须 登陆 后才能发表评论

    return top

    爱投彩票 cwe| 7go| ke7| sic| e7c| o7a| caa| 7es| es7| goi| k8y| gek| y6y| kio| 6ea| ec6| kqw| a6a| qes| wuo| 7oc| oc7| kqe| k5e| mua| 5ay| cs5| ywk| m5g| gea| 6ou| wky| sq6| ywa| o6u| gca| 4aw| io4| sqm| e55| ges| w5e| ecq| 5ea| oua| my5| qok| s3q| yma| 4qg| om4| uca| g4y| qeu| 4qm| uc4| yec| s4u| q3y| oou|