告别Disruptor(一) 简洁优雅的高性能并发队列

几年前听说过Disruptor,一直没用过也没深究, 其号称是一个性能爆表的并发队列,上Github/LMAX-Exchange/disruptor 去看了看,官方性能描述文章 选了慢如蜗牛的ArrayBlockQueue来对比。在Nehalem 2.8Ghz – Windows 7 SP1 64-bit录得性能见后(其中P,C分别代表 Producer和Consumer):

1P – 1C 的吞吐量两千五百万次,1P – 3C Multicast 就降到了一千万次不到,对比我所认为的非线程安全1P -1C队列亿次每秒的量级,感觉并不强大。亿次每秒的队列加上线程安全,毛估估1P-1C性能减半五千万次每秒,1P-3C 再减少个30%三千五百万次每秒,应该差不多了吧。

继续读读Disruptor的介绍,整体业务框架先不谈,关于队列的部分,我只想问可以说脏话不,太TMD的复杂了,实现方式一点都不优雅,讲真,我想用100行代码灭了它。

说干就干,先尝试一下。

ArrayBlockingQueueDisruptor
Unicast: 1P – 1C5,339,25625,998,336
Pipeline: 1P – 3C2,128,91816,806,157
Sequencer: 3P – 1C5,539,53113,403,268
Multicast: 1P – 3C1,077,3849,377,871
Diamond: 1P – 3C2,113,94116,143,613

Comparative throughput (in ops per sec)

第一步 简单粗暴的阻塞队列

先简单来个,这年头用synchronized要被人瞧不起的(风闻现在性能好多了),大家言必谈ReentrantLock,CAS,那么我也赶潮流CAS吧,主流程如下。

步骤主要工作失败流程
1无论是put还是take,先对head/tail getAndIncrease 在Array中占位100%成功
2检查是否能够放/取失败则循环检查,条件允许了再操作(不能返回失败,第1步的占位无法回退)
3执行放/取操作

第2步的操作,如果失败,并非完全不能回退,只是需要牵扯到一套复杂的逻辑,在这个简单粗暴的实现中,先不考虑非阻塞方案。

核心代码如下:

private Object[] array;
private final int capacity;
private final int m;

private final AtomicLong tail;
private final AtomicLong head;
private final AtomicLong[] als = new AtomicLong[11];

public RiskBlockingQueue(int preferCapacity) {
	this.capacity = getPow2Value(preferCapacity, MIN_CAPACITY, MAX_CAPACITY);
	//这是个取比preferCapacity大的,最接近2的整数幂的函数(限制必须在 MIN MAX 之间)
	array = new Object[this.capacity];
	this.m = this.capacity - 1;		

	for (int i = 0; i < als.length; i++) {     // 并不一定100%成功的伪共享padding
		als[i] = new AtomicLong(0);
	}
	head = als[3];
	tail = als[7];
}

public void put(T obj) {
	ProgramError.when(obj == null, "Can't add null into this queue");
	int p = (int) (head.getAndIncrement() & this.m);
	int packTime = MIN_PACKTIME_NS;
	while(array[p] != null) {
		LockSupport.parkNanos(packTime);
		if(packTime < MAX_PACKTIME_NS) packTime <<= 1;
	}
	array[p] = obj;
}

public T take(){
	Object r;
	int p = (int) (tail.getAndIncrement() & this.m);
	int packTime = MIN_PACKTIME_NS;
	while((r = array[p]) == null) {
		LockSupport.parkNanos(packTime);
		if(packTime < MAX_PACKTIME_NS) packTime <<= 1;
	}
	array[p] = null;
	return (T)r;
}

代码简单的不要不要的,30来行代码,一个线程安全的阻塞就基本完成。什么?你问构造函数为什么叫RiskBlockingQueue?,很简单,有Risk,这并不是一个真正意义上的线程安全Queue,它有风险,那么风险在哪里呢?

各位看官先自己想想风险在哪里,我先来测个1P-1C 性能数据 (以下数据都在关闭了CPU超线程的环境下测试获得,超线程时数据经常看上去很美)

1P – 1C

Producer 0 has completed. Cost Per Put 19ns.
Consumer 0 has completed. Cost Per Take 19ns.
Total 201M I/O, cost 3844ms, 52374243/s(52M/s)

??,还真的和预测差不多,性能减半。

接下来,揭晓这个队列的风险,我们再来看看队列中一个P线程(Producer 线程,Consumer称为C线程,下同)put操作的工作流程

步骤主要工作
1getAndIncrease 占位
2检查是否能够put
3执行put操作

假设某线程P0,执行完了第1步,在执行第2或3步时被叫去喝茶了

这时P0线程本应填充位置array[x]但却没有填充(如果有对应的C线程,也take不到对象,被卡在array[x]上不断pack)。

但线程P1 – Pn在欢快的继续执行,不断的put,沿着array往前跑,渐渐的,一圈过去了,P1 线程也来到了array[x]的位置。

这时,P0线程和P1线程对array[x]位置的访问处于竞争状态,array[x] 没有任何锁/同步/信号量/原子操作保护。这可能会造成对象丢失,并卡住一个C线程,并最终卡住整个队列。C线程们也一样,极端情况下,当一个C线程追上另一个C线程的时候,也会对数组的同一位置发生非线程安全的争用。

哪里有问题,就解决哪里的问题

第二步 真正的并发阻塞Array队列

对于数组,Java没有提供直接的CAS操作方式(除非自己调Unsafe),不但没有CAS,连volatile都没有。不过,Java中有一个内置的类,叫AtomicReferenceArray,简而言之,这个类提供了对 T[] 数组的CAS及类似操作。继续上代码,这次还少了2行。

private AtomicReferenceArray<T> array;  //原本是 Object[] array
private final int capacity;
private final int m;
private final AtomicLong tail;
private final AtomicLong head;

public ConcurrentBlockingQueue(int preferCapacity) {
	this.capacity = getPow2Value(preferCapacity, MIN_CAPACITY, MAX_CAPACITY);
	array = new AtomicReferenceArray<T>(this.capacity);
	this.m = this.capacity - 1;		

	for (int i = 0; i < als.length; i++) {
		als[i] = new AtomicLong(0);
	}
	head = als[3];
	tail = als[7];
}

public void put(T obj) {
	ProgramError.when(obj == null, "Queue object can't be null");
	int p = (int) (head.getAndIncrement() & this.m);
	int packTime = MIN_PACKTIME_NS;
	while(!array.compareAndSet(p, null, obj)) { //***
		LockSupport.parkNanos(packTime);
		if(packTime < MAX_PACKTIME_NS) packTime <<= 1;
	}
}

public T take(){
	T r;
	int p = (int) (tail.getAndIncrement() & this.m);
	int packTime = MIN_PACKTIME_NS;
	while((r=array.getAndSet(p, null)) == null) {  //***
		LockSupport.parkNanos(packTime);
		if(packTime < MAX_PACKTIME_NS) packTime <<= 1;
	}
	return r;
}

这段代码的核心修改是,每次读写array的位置p,都通过CAS和GAS的原子操作来完成,保证了array的线程安全,在高度争用时,依然可以确保放一个取一个的交替。 接下来继续测试性能。

1P – 1C

Consumer 0 has completed. Cost Per Take 29ns.
Producer 0 has completed. Cost Per Add 29ns.
Total 201M I/O, cost 5912ms, 34053889/s(34M/s)

1P – 3C

Consumer 0 has completed. Cost Per Take 143ns.
Consumer 2 has completed. Cost Per Take 143ns.
Producer 0 has completed. Cost Per Put 47ns.
Consumer 1 has completed. Cost Per Take 143ns.
Total 201M I/O, cost 9665ms, 20830480/s (20M/s)

3P – 1C

Producer 0 has completed. Cost Per Put 185ns.
Producer 1 has completed. Cost Per Put 185ns.
Producer 2 has completed. Cost Per Put 185ns.
Consumer 0 has completed. Cost Per Take 62ns.
Total 201M I/O, cost 12551ms, 16040681/s(16M/s)

2P – 2C

Producer 0 has completed. Cost Per Put 147ns.
Producer 1 has completed. Cost Per Put 148ns.
Consumer 0 has completed. Cost Per Take 148ns.
Consumer 1 has completed. Cost Per Take 148ns.
Total 201M I/O, cost 14986ms, 13434311/s(13M/s)

感觉有点沮丧,可能是因为增加的CAS操作,性能进一步下降。

考虑到我的测试环境里CPU比Disruptor当年的CPU快不少,实际可比性能,1P-1C估计只有Disruptor的1.1倍,1P – 3C的性能,只有其1.5倍。快,但是快的及其有限,号称要挑战Disruptor,却仅凭点数小胜,而非直接击倒,不爽。

具体的性能对比如下:

Disruptor
Nehalem 2.8Ghz
ConcurrentBlockingQueue
Skylake-X 3.3Ghz
Unicast: 1P – 1C25,998,33634,053,889
Pipeline: 1P – 3C16,806,157
Sequencer: 3P – 1C13,403,268
Multicast: 3P – 1C16,040,681
Multicast: 1P – 3C9,377,87120,830,480
Diamond: 1P – 3C16,143,613
Multicast: 2P – 2C13,434,311

问题出在哪里呢?仔细想了一下,可能是过多的变量共享访问及数组的共享访问造成了大量的 L1/2 缓存失效,描述如下。

1P – 1C时,一旦head和tail靠近或套圈,那么对64字节的数组内存就直接形成了共享争用。

在指针压缩时,Java的每个引用占用4字节,64字节一条的Cache line 一共有16个引用,两个线程一个放一个取,不断的 L1/2 缓存失效,锁定,修改。

1P – 3C的时候,还要再加上C与C之间的 tail 争用和 L1/2 缓存争用,然后,性能就下降到这么个结果了。

还是那句话,哪里有问题,就解决哪里的问题

但是怎么改呢,想了下:
1P1C时,一旦队列空了或是满了,很容易陷在Producer/Consumer一个放一个取的过程中,而多P或多C时,多个P/C线程,排着队一个个自增head/tail然后修改共享数组,P线程们争完head之后,集中在共享数组的相邻位置尝试放置对象;C线程们争用tail,再修改共享数组的相邻位置,几乎每次访问都是无效 L1/2 缓存。
也就是说,在密集争用时,后一个线程正好争得前一个线程取得的位置的后一个的位置(head/tail + 1),然后这两个线程对位于同一个cache line上的相邻的数组位置进行访问,造成缓存失效,性能下降,这是流程上的硬伤,没法改,只能推倒重来。

3 非阻塞线程安全Array队列

上一个队列,不但性能不尽人意,而且很难支持非阻塞操作,换个思路,重新设计一个支持非阻塞操作的队列,并尽可能少共享访问内存的情况。流程如下:

步骤主要工作失败流程
1检查可能能够放/取的数组位置(读取竞争变量head或tail)不会失败
2检查是否能够放/取(对应位置是否有对象,反映了队列是否空/满)回步骤1/或返回失败
3getAndIncrease竞争变量head或tail,竞争该位置的操作权(放/取)回步骤1/或返回失败
4执行放/取操作失败就循环重试

先以offer为例看看代码:

private AtomicReferenceArray<T> array;

public boolean offer(T obj) {
	if(obj == null) throw new NullPointExceprion("Can't put null object into this queue");
	long head = this.head.get(); //步骤1
	int p =(int) (head & this.m);
	if(array.get(p) != null) return false;   //步骤2
	if(this.head.compareAndSet(head, head + 1)) {  //步骤3
		int packTime = MIN_PACKTIME_NS;
		while(!array.compareAndSet(p, null, obj)) { //步骤4
			LockSupport.parkNanos(MIN_PACKTIME_NS);
			if(packTime < MAX_PACKTIME_NS) packTime <<= 1;
		};
		return true;
	}
	return false;
}

这里我们要注意,在offer的第2步,就算if判断时,array.get(p) == null 可以放置对象,但这并不说明,在第4步,位置p仍然为空,因为可能有上一圈甚至再上一圈,不知道为什么停在这里的P线程往array的p位置放置了对象(虽然概率很小)。此时要等到一个C线程将这个对象取出之后,该位置才能被继续放置对象,如果没有C线程来取,将一直等在这里。

由于此时head已经被getAndIncrease,这又是一个已经占位,无法轻易回退的地方。我们也不可能将array.compareAndSet和head.getAndIncrease两个原子操作,合并成一个原子操作。

采用不可回退,反复循环尝试的方式,代码虽然工作正常,但存在block较长时间的可能,甚至,在极其极端的情况下(多队列,多线程,复杂流水线且存在特定的处理循环),还有可能引起死锁。

难道又改成一个不支持非阻塞操作的队列?心有不甘!最好有一个不怎么影响整体性能的方案来实现这里的回退。

此类的并发冲突是个小概率事件,需要回退的比例很低,回退部分可以性能较差,但正常处理时的性能要尽量不受影响。

高性能 = 减少真共享 + 消除伪共享 + 降低争用 + …. 但是,怎么写呢? 干到这里时正好是中午,下楼吃饭时边吃边想,然后,在开心的喝着牛肉汤时更开心的把这个问题想通了。 代码如下:

private final byte[] falseOffer;   //新增
private final byte[] falsePoll;  //新增
private final AtomicReferenceArray<T> array;
final int capacity;
final int m;
final AtomicLong tail;  
final AtomicLong head;

public ConcurrentQueue(int preferCapacity) {
	this.capacity = ComUtils.getPow2Value(preferCapacity, MIN_CAPACITY, MAX_CAPACITY);
	array = new AtomicReferenceArray<T>(this.capacity);
	falseAdd = new byte[this.capacity];
	falsePoll = new byte[this.capacity];
	this.m = this.capacity - 1;

	for (int i = 0; i < als.length; i++) {
		als[i] = new AtomicLong(0);
	}
	head = als[3];
	tail = als[7];
}

public boolean offer(T obj) {
	if(obj == null) throw new NullPointExceprion("Can't put null object into this queue");
	while(true) {
		long head = this.head.get();
		int p =(int) (head & this.m);
		if(falsePoll[p] > 0) {
			synchronized(falsePoll) {  //运行比例很低,性能要求不高,直接同步处理
				if(falsePoll[p] > 0) {  //如果不满足条件,说明失效计数已被其他线程处理,break; 回到最初重新尝试offer
					if(this.head.compareAndSet(head, head + 1)){ //如果不满足条件,说明位置P已经失效,回到最初重新尝试offer
						falsePoll[p] --; //跳过一次存在poll失效计数的位置p, poll失效计数 - 1,回到最初重新尝试offer
					}
				}
			}
			break;
		}
		if(array.get(p) != null) return false;
		if(this.head.compareAndSet(head, head + 1)) {
			for(int i = 0; i < INTERNAL_PACK_COUNT; i ++) {
				if(!array.compareAndSet(p, null, obj)) {
					LockSupport.parkNanos(2 << i);
				} else return true;
			}
			synchronized(falseOffer) {  //运行比例很低,性能要求不高,直接同步处理
				falseOffer[p] ++;  //位置p的add失效计数器
			}
		}
		return false;
	}
	return false;
}


public T poll(){
	while(true) {
		T r;
		long tail = this.tail.get();
		int p = (int) (tail & this.m);
		if(falseOffer[p] > 0) {
			synchronized(falseOffer) {
				if(this.tail.compareAndSet(tail, tail + 1)) {
					falseOffer[p]--;
				}
			}
			break;
		}
		r = array.get(p);
		if(r == null) return null;
		if(this.tail.compareAndSet(tail, tail + 1)) {
			for(int i = 0; i < INTERNAL_PACK_COUNT; i ++) {
				if((r = array.getAndSet(p, null)) == null){
					LockSupport.parkNanos(2 << i);
				} else return r;
			}
			synchronized(falsePoll) {
				falsePoll[p] ++;
			}
		}
		return null;
	}
	return null;
}

新增了两个和array等长的byte数组falseOffer[] 和 falsePoll[]作为失败回退计数器,如果在前文提到的offer/ poll 过程中,发生了array的p位置的 CAS或GAS失败,并且无法通过重试少量次数迅速成功,那么将失败回退计数器 falseOffer[p] 或是 falsePoll[p] +1 。

正常流程中,每次offer / poll 时,先读取falsePoll[p] / falseOffer[p],如果poll 读到的 falseOffer[p] 大于0,说明位置p发生过应该offer却未能成功offer的回退,poll 操作应该忽略位置p一次,此时C线程同步锁定,检查,尝试自增tail,将falseOffer[p] –,然后继续尝试下一个位置。这一连串的过程,是个小概率事件,简单的同步锁就好了,无需过多考虑性能。

同时,因为回退是小概率事件,所以falseOffer[] 和 falsePoll[]数组很少被修改,所有的对这两个失败回退计数数组的读取,大部分时间都处于 L1/2 缓存有效状态,平均访问耗时应该在1ns左右,性能影响很小。

再看看这个解决方案的安全性,虽然失败回退是个小概率事件,但数组byte会不会溢出?会不会同一个位置,累计超过127次失败?测试下吧。

测试结果如下:

队列长度线程数观察到的byte计数最大值
2256P – 256C9
4512P-512C2
81024P-1024C1

在将队列长度设置为最小值2,几百个线程操作的时候,观察到了byte数组中有最高9的计数,当队列长度设置到8时,千余线程,经短时运行测试,没有观察到过大于1的计数。而在实际应用中,最小队列长度会限制为1024或更大(高性能服务器弄个很小的队列,没啥意义),这个byte数组的溢出概率极极极极极极极小。如果想省点空间,这个byte数组应该还可以进一步优化,用4个bit来计数就够了。

高性能 = 减少真共享 + 消除伪共享 + 降低争用 + ….

还有降低争用一招没用,什么是关键竞争变量,head? tail? array? P线程竞争head,然后竞争array,C线程竞争tail 然后竞争array,当队列长度居中时,array(连续16个引用)就比head/tail竞争更激烈,而当队列满/空时,array的争用压力还需要再相加一下,在大吞吐量,多线程竞争一个资源失败时,如果大家都很激进的重复竞争,将导致这些争用和共享资源反复处于缓存失效状态,降低性能。因此,当某个offer/poll操作失败时,失败的线程需要等待的稍微久一点,再尝试下一次,而不是简单粗暴的packNano(1),当队列一直空,或是满的时候,相关线程更不应该反复循环,应该等久一点,然后重试。这部分就不专门贴代码了,有兴趣可以直接去github上拉源码看。

在这个过程中,还有一些其他回退检测流程上的小坑也被自然填平了,不再多说。核心要点上面已经全部列出来了。

老样子,实践是检验真理的唯一标准,继续跑分:

1P – 1C

Producer 0 has completed. Cost Per Put 14ns.
Consumer 0 has completed. Cost Per Take 14ns.
Total 440M I/O, cost 6282ms, 70,105,367/s, 70.11M/s

1P – 3C

Consumer 2 has completed. Cost Per Take 19ns.
Consumer 1 has completed. Cost Per Take 36ns.
Producer 0 has completed. Cost Per Put 14ns.
Consumer 0 has completed. Cost Per Take 42ns.
Total 440M I/O, cost 6256ms, 70,396,726/s, 70.40M/s

3P – 1C

Producer 0 has completed. Cost Per Put 23ns.
Producer 1 has completed. Cost Per Put 38ns.
Producer 2 has completed. Cost Per Put 44ns.
Consumer 0 has completed. Cost Per Take 14ns.
Total 440M I/O, cost 6519ms, 67,556,668/s, 67.56M/s

2P – 2C

Producer 1 has completed. Cost Per Put 14ns.
Consumer 1 has completed. Cost Per Take 25ns.
Producer 0 has completed. Cost Per Put 28ns.
Consumer 0 has completed. Cost Per Take 28ns.
Total 440M I/O, cost 6315ms, 69,739,021/s, 69.74M/s

这下心满意足了,下个新版的Distruptor, 比较了一下。 在当前的Distruptor版本中,所有1P的测试,均使用的createSingleProducer创建的非线程安全的Producer,所以*部分,使用了一个非线程安全的队列进行性能比较。其余的1P-nC 的队列,暂无对应的比较对象,将在后续代码/文章中逐步添加。

Disruptor(Old Ver)
Nehalem 2.8Ghz
Disruptor(V3.3)
Skylake-X 3.3Ghz
ConcurrentQueue
Skylake-X 3.3Ghz
ConcurrentBlockingQueue
(本文第2节的队列)
Skylake-X 3.3Ghz
1P – 1C*134,952,766
OneToOneSequencedThroughputTest
*310,360,761
SimpleBlockingQueue
Thread-Safe 1P – 1C10,373,443
RingBuffer.createMultiProducer
70,105,36734,053,889
Pipeline: 1P – 3C16,806,15722,128,789
OneToThreePipelineSequencedThroughputTest
Sequencer: 3P – 1C13,403,26811,344,299
ThreeToOneSequencedThroughputTest
67,556,66816,040,681
Multicast: 1P – 3C9,377,871168,350,168
OneToThreeSequencedThroughputTest
Diamond: 1P – 3C16,143,61322,899,015
OneToThreeDiamondSequencedThroughputTest
2P – 2C4,273,504
TwoToTwoWorkProcessorThroughputTest
69,739,02113,434,311

在与Disruptor的可比项之间的比较中,ConcurrentQueue线程安全队列,取得了远高于Disruptor的吞吐量,在多线程高并发争用的条件下实现了超过六千万次每秒的吞吐量。数倍于Disruptor

这次算是K.O.了。

这是终点吗?并不是,整条服务器业务处理流水线的大部分地方,通常并不需要真正的线程安全队列。而是更多的需要1P – 1C,或是n为确定数值的 nP – 1C这样的队列,后续将会参照1P – 1C的非线程安全队列 SimpleBlockingQuere,继续添加实现及介绍文章

本文所涉及到的代码及测试代码,可在https://github.com/Lofint/tachyon 查看/下载

FavoriteLoading添加本文到我的收藏
  • Trackback 关闭
  • 评论 (12)
    • 海带
    • 2019/05/21 10:42下午

    勘误,第三部分头几段和表格中步骤3的 getAndIncrease 都应改为compareAndSet

    • whoyou223
    • 2019/06/13 4:30下午

    都是CAS,并没有本质区别,只是disruptor功能强大

    • 海带
    • 2019/06/13 7:44下午

    whoyou223 :
    都是CAS,并没有本质区别,只是disruptor功能强大

    CAS 确实不是关键差异,也不是本文的重点

    1 关键差异一,少用一个共享变量,可参考 http://www.shiekolong579.icu/disruptor-writing-ringbuffer/ 里面有一句话 “Disruptor 由消费者负责通知(生产者Barrier)它们处理到了哪个序列号” 这一个共享变量的值变更,在多线程环境下会缓存行失效,需要通过 L3 Cache 读取,通常需要耗费50个时钟周期/12ns或更多的时间。这是造成性能差异的重要原因之一。

    2 我们通常提及的Disruptor的队列,是阻塞的。在流水线中,阻塞队列在应对单消费者线程处理多类和/或多生产者任务时,不得不面对附加的多线程和/或并发队列、任务类别判断等影响性能的问题。这一点可能需要些例子才方便讲清楚。我空了会继续写。

      • xc19891112
      • 2019/06/14 10:38上午

      的确 disruptor做的很强大 缓存填充避免false sharing、CAS、GC、以及事件驱动模型,disruptor 生产和消费都可做到锁无关 ,你是怎么使用disrupto进行测试的代码列出来

    • 海带
    • 2019/06/14 10:53上午

    xc19891112 :
    的确 disruptor做的很强大 缓存填充避免false sharing、CAS、GC、以及事件驱动模型,disruptor 生产和消费都可做到锁无关 ,你是怎么使用disrupto进行测试的代码列出来

    我做的Disruptor的测试都用其自带的测试代码,文中最后一个表格的第三列列出了所有测试代码的类名。CPU型号也已列出,其他环境: 32G DDR4 2133 / Ubuntu 1804 / JDK 1.8.171

    Disruptor自己文章中用的啥测试代码,我简单找了一下没找到明确说法。

    • 流逝的风
    • 2019/06/14 3:16下午

    SimpleBlockingQueue 和 disruptor的OneToOneSequencedThroughputTest 测试一下 没有达到disruptor的吞吐量 相差甚远并且disruptor虽然锁无关 但是有数据一致性 相比SimpleBlockingQueue无法保证

      • 海带
      • 2019/06/14 3:47下午

      这是我的测试结果

      disruptor: OneToOneSequencedThroughputTest

      Starting Disruptor tests
      Run 0, Disruptor=134,408,602 ops/sec BatchPercent=97.80% AverageBatchSize=45
      Run 1, Disruptor=53,590,568 ops/sec BatchPercent=85.72% AverageBatchSize=7
      Run 2, Disruptor=160,771,704 ops/sec BatchPercent=98.39% AverageBatchSize=61
      Run 3, Disruptor=134,770,889 ops/sec BatchPercent=97.18% AverageBatchSize=35
      Run 4, Disruptor=143,678,160 ops/sec BatchPercent=97.69% AverageBatchSize=43
      Run 5, Disruptor=179,533,213 ops/sec BatchPercent=99.07% AverageBatchSize=107
      Run 6, Disruptor=142,857,142 ops/sec BatchPercent=97.56% AverageBatchSize=41

      SimpleBlockingQueue: 1P – 1C
      Producer 0 has completed. Cost Per Put 2ns.
      Consumer 0 has completed. Cost Per Take 2ns.
      Total 440M I/O, cost 1304ms, 337,731,533/s, 337.73M/s

      这是个非线程安全队列(要NUM_CONSUMER = 1;否则会跑失败),但在OneToOneSequencedThroughputTest中Disruptor用的也是 ringBuffer = createSingleProducer(… … …) 而不是 createMultiProducer

      后面不带*的是线程安全比较,

      API方面,当然和Disruptor不是一个量级。但从服务端流水线处理的角度,我认为Disruptor的api有点重。

        • 流逝的风
        • 2019/06/14 3:53下午

        多说无益 研究透了再说灭他吧 谈不上告别

    • 流逝的风
    • 2019/06/14 3:24下午

    你的代码 我也过了一下 实话打败disruptor还远 disruptor针对并发的细节都做了优化 api设计的也很精妙了

  1. 自信是好事,过了就不好了
    多的不说了,我就挑一个简单的说一下,能以文中的方式去使用LockSupport.parkNanos的程序员,不适合写工具类

      • 海带
      • 2019/07/17 7:06下午

      这点你说的很对,parkNanos确实不该这样用。多谢指出

您必须 登陆 后才能发表评论

return top

合乐彩票app下载 pfd| j1t| b22| nnr| x2n| fxj| 2tb| dp2| dxv| n2z| tnd| 2vh| rb1| bvt| d1z| z1v| xjr| 1lz| tv1| vxn| h1b| rzr| 2zv| jt2| vxn| l0z| rth| 0hx| 0xf| tn0| lft| hj1| xpv| n1h| bdt| 1jx| xh9| fxr| b9t| zjn| 0nt| 0tj| jd0| hjz| h0l| prz| 0rp| df0| zjx| l9x| vfv| 9tn| tv9| zjx| rtr| r9j| blr| 9vl| jl0| fzn| lx8| pbr| x8h| lfv| 8nl| xz8| hjv| zbt| r9v| vpv| 9pn| bl9| dfd| n7f| jlb| 7fv| nh7| fxl| z8z| xzn| blr| 8jp| tv8| tdt| r6v| zjx| 6bz| hx7| 7zh| fx7|