前言 最近做一些日志的采集工作 jdk的队列 有点顶不住了 然后看到 很多知名的中间件 工具 都用这个来替代 由于这个东西用的人很多 文档非常丰富 这里只做例子 实际架构图等等 去官网看即可
参考文档: 美团:https://tech.meituan.com/2016/11/18/disruptor.html 官方:http://lmax-exchange.github.io/disruptor/ 动态消费者博客:https://zhuanlan.zhihu.com/p/100386603### https://www.cnblogs.com/luozhiyun/p/11631305.html
基础概念
nP-nC n个生产者-n个消费者
Ring Buffer Ring Buffer在3.0版本以前被认为是Disruptor的核心组件,但是在之后的版本中只是负责存储和更新数据。在一些高级使用案例中用户也能进行自定义
Sequence Disruptor使用一组Sequence来作为一个手段来标识特定的组件的处理进度( RingBuffer/Consumer )。每个消费者和Disruptor本身都会维护一个Sequence。虽然一个 AtomicLong 也可以用于标识进度,但定义 Sequence 来负责该问题还有另一个目的,那就是防止不同的 Sequence 之间的CPU缓存伪共享(Flase Sharing)问题。
Sequencer Sequencer是Disruptor的真正核心。此接口有两个实现类 SingleProducerSequencer、MultiProducerSequencer ,它们定义在生产者和消费者之间快速、正确地传递数据的并发算法。
Sequence Barrier 保持Sequencer和Consumer依赖的其它Consumer的 Sequence 的引用。除此之外还定义了决定 Consumer 是否还有可处理的事件的逻辑。
Wait Strategy Wait Strategy决定了一个消费者怎么等待生产者将事件(Event)放入Disruptor中。
Event 从生产者到消费者传递的数据叫做Event。它不是一个被 Disruptor 定义的特定类型,而是由 Disruptor 的使用者定义并指定。
EventProcessor 持有特定的消费者的Sequence,并且拥有一个主事件循环(main event loop)用于处理Disruptor的事件。其中BatchEventProcessor是其具体实现,实现了事件循环(event loop),并且会回调到实现了EventHandler的已使用过的实例中。
EventHandler 由用户实现的接口,用于处理事件,是 Consumer 的真正实现
Producer 生产者,只是泛指调用 Disruptor 发布事件的用户代码,Disruptor 没有定义特定接口或类型
示例1-直接使用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 package com.ming;import com.lmax.disruptor.BlockingWaitStrategy;import com.lmax.disruptor.EventFactory;import com.lmax.disruptor.EventHandler;import com.lmax.disruptor.RingBuffer;import com.lmax.disruptor.dsl.Disruptor;import com.lmax.disruptor.dsl.ProducerType;import org.junit.Test;import java.util.concurrent.ThreadFactory;public class TestDisruptor { @Test public void test () throws InterruptedException { class Element { private int value; public int get () { return value; } public void set (int value) { this .value = value; } } ThreadFactory threadFactory = r -> new Thread (r, "simpleThread" ); EventFactory<Element> factory = Element::new ; EventHandler<Element> handler1 = (element, sequence, endOfBatch) -> System.out.println("Element1: " + element.get()); EventHandler<Element> handler2 = (element, sequence, endOfBatch) -> System.out.println("Element2: " + element.get()); BlockingWaitStrategy strategy = new BlockingWaitStrategy (); int bufferSize = 16 ; Disruptor<Element> disruptor = new Disruptor (factory, bufferSize, threadFactory, ProducerType.SINGLE, strategy); disruptor.handleEventsWith(handler1); disruptor.handleEventsWith(handler2); disruptor.start(); RingBuffer<Element> ringBuffer = disruptor.getRingBuffer(); for (int l = 0 ; true ; l++) { long sequence = ringBuffer.next(); try { Element event = ringBuffer.get(sequence); event.set(l); } finally { ringBuffer.publish(sequence); } Thread.sleep(10 ); } } }
示例2-单例使用 包装成一个单例子 方便其他地方调用 个人感觉强行写成单例 。。。有点丑陋
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 package com.ming.core.disruptor;public class Element { private Integer value; public Integer get () { return value; } public void set (Integer value) { this .value = value; } public void clear () { value = null ; } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 package com.ming.core.disruptor;import com.lmax.disruptor.BlockingWaitStrategy;import com.lmax.disruptor.EventFactory;import com.lmax.disruptor.EventHandler;import com.lmax.disruptor.dsl.Disruptor;import com.lmax.disruptor.dsl.ProducerType;import java.util.concurrent.ThreadFactory;public class ElementDisruptorSingleton { private static volatile Disruptor<Element> elementDisruptor; private ElementDisruptorSingleton () { } public static Disruptor<Element> getInstance () { if (null == elementDisruptor) { synchronized (Disruptor.class) { if (null == elementDisruptor) { ThreadFactory threadFactory = r -> new Thread (r, "simpleThread" ); EventFactory<Element> factory = Element::new ; EventHandler<Element> handler1 = (element, sequence, endOfBatch) -> System.out.println("Element1: " + element.get()); EventHandler<Element> handler2 = (element, sequence, endOfBatch) -> System.out.println("Element2: " + element.get()); BlockingWaitStrategy strategy = new BlockingWaitStrategy (); int bufferSize = 16 ; Disruptor<Element> disruptor = new Disruptor (factory, bufferSize, threadFactory, ProducerType.SINGLE, strategy); disruptor.handleEventsWith(handler1); disruptor.handleEventsWith(handler2); elementDisruptor = disruptor; elementDisruptor.start(); } } } return elementDisruptor; } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 package com.ming.core.disruptor;import com.lmax.disruptor.RingBuffer;public class TestDisruptor { public static void main (String[] args) { RingBuffer<Element> ringBuffer = ElementDisruptorSingleton.getInstance().getRingBuffer(); for (int l = 0 ; true ; l++) { long sequence = ringBuffer.next(); try { Element event = ringBuffer.get(sequence); event.set(l); } finally { ringBuffer.publish(sequence); } try { Thread.sleep(10 ); } catch (InterruptedException e) { e.printStackTrace(); } } } }
示例3-spring中使用 spring中 注册到ioc容器中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 package com.ming.core.disruptor;import com.lmax.disruptor.BlockingWaitStrategy;import com.lmax.disruptor.EventFactory;import com.lmax.disruptor.EventHandler;import com.lmax.disruptor.RingBuffer;import com.lmax.disruptor.dsl.Disruptor;import com.lmax.disruptor.dsl.ProducerType;import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;import javax.annotation.PreDestroy;import java.util.concurrent.ThreadFactory;@Component public class ElementDisruptorService { private volatile Disruptor<Element> elementDisruptor; @PostConstruct public void init () { ThreadFactory threadFactory = r -> new Thread (r, "simpleThread" ); EventFactory<Element> factory = Element::new ; EventHandler<Element> handler1 = (element, sequence, endOfBatch) -> System.out.println("Element1: " + element.get()); EventHandler<Element> handler2 = (element, sequence, endOfBatch) -> System.out.println("Element2: " + element.get()); BlockingWaitStrategy strategy = new BlockingWaitStrategy (); int bufferSize = 16 ; elementDisruptor = new Disruptor (factory, bufferSize, threadFactory, ProducerType.SINGLE, strategy); elementDisruptor.handleEventsWith(handler1); elementDisruptor.handleEventsWith(handler2); elementDisruptor.start(); } @PreDestroy public void destroy () { elementDisruptor.shutdown(); } public void sendMessage () { RingBuffer<Element> ringBuffer = elementDisruptor.getRingBuffer(); for (int l = 0 ; true ; l++) { long sequence = ringBuffer.next(); try { Element event = ringBuffer.get(sequence); event.set(l); } finally { ringBuffer.publish(sequence); } try { Thread.sleep(10 ); } catch (InterruptedException e) { e.printStackTrace(); } } } }
优化 单生产者和多生产者 如果确定只有一个线程生产 将disruptor设置为单生产者来提高性能 ProducerType.SINGLE
1 Disruptor<Element> disruptor = new Disruptor (factory, bufferSize, threadFactory, ProducerType.SINGLE, strategy);
等待策略 strategy
BlockingWaitStrategy Disruptor的默认策略是BlockingWaitStrategy。在BlockingWaitStrategy内部是使用锁和condition来控制线程的唤醒。BlockingWaitStrategy是最低效的策略,但其对CPU的消耗最小并且在各种不同部署环境中能提供更加一致的性能表现
SleepingWaitStrategy SleepingWaitStrategy 的性能表现跟 BlockingWaitStrategy 差不多,对 CPU 的消耗也类似,但其对生产者线程的影响最小,通过使用LockSupport.parkNanos(1)来实现循环等待。一般来说Linux系统会暂停一个线程约60µs,这样做的好处是,生产线程不需要采取任何其他行动就可以增加适当的计数器,也不需要花费时间信号通知条件变量。但是,在生产者线程和使用者线程之间移动事件的平均延迟会更高。它在不需要低延迟并且对生产线程的影响较小的情况最好。一个常见的用例是异步日志记录。
YieldingWaitStrategy YieldingWaitStrategy是可以使用在低延迟系统的策略之一。YieldingWaitStrategy将自旋以等待序列增加到适当的值。在循环体内,将调用Thread.yield(),以允许其他排队的线程运行。在要求极高性能且事件处理线数小于 CPU 逻辑核心数的场景中,推荐使用此策略;例如,CPU开启超线程的特性。
BusySpinWaitStrategy 性能最好,适合用于低延迟的系统。在要求极高性能且事件处理线程数小于CPU逻辑核心树的场景中,推荐使用此策略;例如,CPU开启超线程的特性。
及时清理ring buffer中的对象 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 class Element { private int value; public int get () { return value; } public void set (int value) { this .value = value; } public void clear () { value = 0 ; } } ... disruptor.handleEventsWith(handler1).then(((event, sequence, endOfBatch) -> event.clear()));
总结 disruptor是用来替换jdk 原本的队列 性能会高出很多 不过还是基于内存的 只能在单节点中做一些操作 替换原本使用jdk队列的地方