前言
最近做一些日志的采集工作 jdk的队列 有点顶不住了
然后看到 很多知名的中间件 工具 都用这个来替代
由于这个东西用的人很多 文档非常丰富 这里只做例子 实际架构图等等 去官网看即可
参考文档:
美团:https://tech.meituan.com/2016/11/18/disruptor.html
官方:http://lmax-exchange.github.io/disruptor/
动态消费者博客:https://zhuanlan.zhihu.com/p/100386603###
https://www.cnblogs.com/luozhiyun/p/11631305.html
基础概念
- nP-nC n个生产者-n个消费者
- Ring Buffer
Ring Buffer在3.0版本以前被认为是Disruptor的核心组件,但是在之后的版本中只是负责存储和更新数据。在一些高级使用案例中用户也能进行自定义 - Sequence
Disruptor使用一组Sequence来作为一个手段来标识特定的组件的处理进度( RingBuffer/Consumer )。每个消费者和Disruptor本身都会维护一个Sequence。虽然一个 AtomicLong 也可以用于标识进度,但定义 Sequence 来负责该问题还有另一个目的,那就是防止不同的 Sequence 之间的CPU缓存伪共享(Flase Sharing)问题。 - Sequencer
Sequencer是Disruptor的真正核心。此接口有两个实现类 SingleProducerSequencer、MultiProducerSequencer ,它们定义在生产者和消费者之间快速、正确地传递数据的并发算法。 - Sequence Barrier
保持Sequencer和Consumer依赖的其它Consumer的 Sequence 的引用。除此之外还定义了决定 Consumer 是否还有可处理的事件的逻辑。 - Wait Strategy
Wait Strategy决定了一个消费者怎么等待生产者将事件(Event)放入Disruptor中。 - Event
从生产者到消费者传递的数据叫做Event。它不是一个被 Disruptor 定义的特定类型,而是由 Disruptor 的使用者定义并指定。 - EventProcessor
持有特定的消费者的Sequence,并且拥有一个主事件循环(main event loop)用于处理Disruptor的事件。其中BatchEventProcessor是其具体实现,实现了事件循环(event loop),并且会回调到实现了EventHandler的已使用过的实例中。 - EventHandler
由用户实现的接口,用于处理事件,是 Consumer 的真正实现 - Producer
生产者,只是泛指调用 Disruptor 发布事件的用户代码,Disruptor 没有定义特定接口或类型示例1-直接使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84package com.ming;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import org.junit.Test;
import java.util.concurrent.ThreadFactory;
/**
* 测试 disruptor
*
* @author ming
* @date 2020-10-27 10:30
*/
public class TestDisruptor {
/**
* 基于美团文档的 示例 增加一个消费者
*
* @author ming
* @date 2020-10-27 15:17
*/
public void test() throws InterruptedException {
// 队列中的元素
class Element {
private int value;
public int get() {
return value;
}
public void set(int value) {
this.value = value;
}
}
// 生产者的线程工厂
ThreadFactory threadFactory = r -> new Thread(r, "simpleThread");
// RingBuffer生产工厂,初始化RingBuffer的时候使用
EventFactory<Element> factory = Element::new;
// 处理Event的handler
EventHandler<Element> handler1 = (element, sequence, endOfBatch) -> System.out.println("Element1: " + element.get());
EventHandler<Element> handler2 = (element, sequence, endOfBatch) -> System.out.println("Element2: " + element.get());
// 阻塞策略
BlockingWaitStrategy strategy = new BlockingWaitStrategy();
// 指定RingBuffer的大小
int bufferSize = 16;
// 创建disruptor,采用单生产者模式
Disruptor<Element> disruptor = new Disruptor(factory, bufferSize, threadFactory, ProducerType.SINGLE, strategy);
// 设置EventHandler
disruptor.handleEventsWith(handler1);
disruptor.handleEventsWith(handler2);
// 启动disruptor的线程
disruptor.start();
RingBuffer<Element> ringBuffer = disruptor.getRingBuffer();
for (int l = 0; true; l++) {
// 获取下一个可用位置的下标
long sequence = ringBuffer.next();
try {
// 返回可用位置的元素
Element event = ringBuffer.get(sequence);
// 设置该位置元素的值
event.set(l);
} finally {
ringBuffer.publish(sequence);
}
Thread.sleep(10);
}
}
}
示例2-单例使用
包装成一个单例子 方便其他地方调用 个人感觉强行写成单例 。。。有点丑陋
1 | package com.ming.core.disruptor; |
1 | package com.ming.core.disruptor; |
1 | package com.ming.core.disruptor; |
示例3-spring中使用
spring中 注册到ioc容器中
1 | package com.ming.core.disruptor; |
优化
单生产者和多生产者
如果确定只有一个线程生产 将disruptor设置为单生产者来提高性能
ProducerType.SINGLE
1 | Disruptor<Element> disruptor = new Disruptor(factory, bufferSize, threadFactory, ProducerType.SINGLE, strategy); |
等待策略 strategy
BlockingWaitStrategy
Disruptor的默认策略是BlockingWaitStrategy。在BlockingWaitStrategy内部是使用锁和condition来控制线程的唤醒。BlockingWaitStrategy是最低效的策略,但其对CPU的消耗最小并且在各种不同部署环境中能提供更加一致的性能表现SleepingWaitStrategy
SleepingWaitStrategy 的性能表现跟 BlockingWaitStrategy 差不多,对 CPU 的消耗也类似,但其对生产者线程的影响最小,通过使用LockSupport.parkNanos(1)来实现循环等待。一般来说Linux系统会暂停一个线程约60µs,这样做的好处是,生产线程不需要采取任何其他行动就可以增加适当的计数器,也不需要花费时间信号通知条件变量。但是,在生产者线程和使用者线程之间移动事件的平均延迟会更高。它在不需要低延迟并且对生产线程的影响较小的情况最好。一个常见的用例是异步日志记录。YieldingWaitStrategy
YieldingWaitStrategy是可以使用在低延迟系统的策略之一。YieldingWaitStrategy将自旋以等待序列增加到适当的值。在循环体内,将调用Thread.yield(),以允许其他排队的线程运行。在要求极高性能且事件处理线数小于 CPU 逻辑核心数的场景中,推荐使用此策略;例如,CPU开启超线程的特性。BusySpinWaitStrategy
性能最好,适合用于低延迟的系统。在要求极高性能且事件处理线程数小于CPU逻辑核心树的场景中,推荐使用此策略;例如,CPU开启超线程的特性。
及时清理ring buffer中的对象
1 | class Element { |
总结
disruptor是用来替换jdk 原本的队列
性能会高出很多
不过还是基于内存的 只能在单节点中做一些操作 替换原本使用jdk队列的地方