disruptor使用笔记

前言

最近做一些日志的采集工作 jdk的队列 有点顶不住了 然后看到 很多知名的中间件 工具 都用这个来替代 由于这个东西用的人很多 文档非常丰富 这里只做例子 实际架构图等等 去官网看即可

参考文档: 美团:https://tech.meituan.com/2016/11/18/disruptor.html 官方:http://lmax-exchange.github.io/disruptor/ 动态消费者博客:https://zhuanlan.zhihu.com/p/100386603### https://www.cnblogs.com/luozhiyun/p/11631305.html

基础概念
  • nP-nC n个生产者-n个消费者
  • Ring Buffer
    Ring Buffer在3.0版本以前被认为是Disruptor的核心组件,但是在之后的版本中只是负责存储和更新数据。在一些高级使用案例中用户也能进行自定义
  • Sequence
    Disruptor使用一组Sequence来作为一个手段来标识特定的组件的处理进度( RingBuffer/Consumer )。每个消费者和Disruptor本身都会维护一个Sequence。虽然一个 AtomicLong 也可以用于标识进度,但定义 Sequence 来负责该问题还有另一个目的,那就是防止不同的 Sequence 之间的CPU缓存伪共享(Flase Sharing)问题。
  • Sequencer Sequencer是Disruptor的真正核心。此接口有两个实现类 SingleProducerSequencer、MultiProducerSequencer ,它们定义在生产者和消费者之间快速、正确地传递数据的并发算法。
  • Sequence Barrier 保持Sequencer和Consumer依赖的其它Consumer的 Sequence 的引用。除此之外还定义了决定 Consumer 是否还有可处理的事件的逻辑。
  • Wait Strategy Wait Strategy决定了一个消费者怎么等待生产者将事件(Event)放入Disruptor中。
  • Event 从生产者到消费者传递的数据叫做Event。它不是一个被 Disruptor 定义的特定类型,而是由 Disruptor 的使用者定义并指定。
  • EventProcessor 持有特定的消费者的Sequence,并且拥有一个主事件循环(main event loop)用于处理Disruptor的事件。其中BatchEventProcessor是其具体实现,实现了事件循环(event loop),并且会回调到实现了EventHandler的已使用过的实例中。
  • EventHandler 由用户实现的接口,用于处理事件,是 Consumer 的真正实现
  • Producer 生产者,只是泛指调用 Disruptor 发布事件的用户代码,Disruptor 没有定义特定接口或类型
示例1-直接使用
package com.ming;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import org.junit.Test;

import java.util.concurrent.ThreadFactory;

/**
 * 测试 disruptor
 *
 * @author ming
 * @date 2020-10-27 10:30
 */
public class TestDisruptor {
    /**
     * 基于美团文档的 示例  增加一个消费者
     *
     * @author ming
     * @date 2020-10-27 15:17
     */
    @Test
    public void test() throws InterruptedException {
// 队列中的元素
        class Element {

            private int value;

            public int get() {
                return value;
            }

            public void set(int value) {
                this.value = value;
            }

        }

        // 生产者的线程工厂
        ThreadFactory threadFactory = r -> new Thread(r, "simpleThread");

        // RingBuffer生产工厂,初始化RingBuffer的时候使用
        EventFactory<Element> factory = Element::new;

        // 处理Event的handler
        EventHandler<Element> handler1 = (element, sequence, endOfBatch) -> System.out.println("Element1: " + element.get());
        EventHandler<Element> handler2 = (element, sequence, endOfBatch) -> System.out.println("Element2: " + element.get());

        // 阻塞策略
        BlockingWaitStrategy strategy = new BlockingWaitStrategy();

        // 指定RingBuffer的大小
        int bufferSize = 16;

        // 创建disruptor,采用单生产者模式
        Disruptor<Element> disruptor = new Disruptor(factory, bufferSize, threadFactory, ProducerType.SINGLE, strategy);
        // 设置EventHandler
        disruptor.handleEventsWith(handler1);
        disruptor.handleEventsWith(handler2);

        // 启动disruptor的线程
        disruptor.start();

        RingBuffer<Element> ringBuffer = disruptor.getRingBuffer();

        for (int l = 0; true; l++) {
            // 获取下一个可用位置的下标
            long sequence = ringBuffer.next();
            try {
                // 返回可用位置的元素
                Element event = ringBuffer.get(sequence);
                // 设置该位置元素的值
                event.set(l);
            } finally {
                ringBuffer.publish(sequence);
            }
            Thread.sleep(10);
        }
    }
}

示例2-单例使用

包装成一个单例子 方便其他地方调用 个人感觉强行写成单例 。。。有点丑陋

package com.ming.core.disruptor;

/**
 * 测试disruptor的element
 *
 * @author ming
 * @date 2020-10-27 16:17
 */
public class Element {

    private Integer value;

    public Integer get() {
        return value;
    }

    public void set(Integer value) {
        this.value = value;
    }

    public void clear() {
        value = null;
    }
}

package com.ming.core.disruptor;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

import java.util.concurrent.ThreadFactory;

/**
 * 测试 disruptor单例
 *
 * @author ming
 * @date 2020-10-27 16:16
 */
public class ElementDisruptorSingleton {
    private static volatile Disruptor<Element> elementDisruptor;

    private ElementDisruptorSingleton() {
    }


    /**
     * 单例对象
     *
     * @return elementDisruptor
     * @author ming
     * @date 2020-10-27 16:24
     */
    public static Disruptor<Element> getInstance() {
        if (null == elementDisruptor) {
            synchronized (Disruptor.class) {
                if (null == elementDisruptor) {
                    // 生产者的线程工厂
                    ThreadFactory threadFactory = r -> new Thread(r, "simpleThread");
                    // RingBuffer生产工厂,初始化RingBuffer的时候使用
                    EventFactory<Element> factory = Element::new;
                    // 处理Event的handler
                    EventHandler<Element> handler1 = (element, sequence, endOfBatch) -> System.out.println("Element1: " + element.get());
                    EventHandler<Element> handler2 = (element, sequence, endOfBatch) -> System.out.println("Element2: " + element.get());
                    // 阻塞策略
                    BlockingWaitStrategy strategy = new BlockingWaitStrategy();
                    // 指定RingBuffer的大小
                    int bufferSize = 16;
                    // 创建disruptor,采用单生产者模式
                    Disruptor<Element> disruptor = new Disruptor(factory, bufferSize, threadFactory, ProducerType.SINGLE, strategy);
                    // 设置EventHandler 并且后置清理消费过的数据
                    disruptor.handleEventsWith(handler1);
                    disruptor.handleEventsWith(handler2);
                    elementDisruptor = disruptor;
                    // 启动disruptor的线程
                    elementDisruptor.start();
                }
            }
        }
        return elementDisruptor;
    }
}

package com.ming.core.disruptor;

import com.lmax.disruptor.RingBuffer;

/**
 * 测试disruptor
 *
 * @author ming
 * @date 2020-10-27 16:18
 */
public class TestDisruptor {
    public static void main(String[] args) {
        RingBuffer<Element> ringBuffer = ElementDisruptorSingleton.getInstance().getRingBuffer();
        for (int l = 0; true; l++) {
            // 获取下一个可用位置的下标
            long sequence = ringBuffer.next();
            try {
                // 返回可用位置的元素
                Element event = ringBuffer.get(sequence);
                // 设置该位置元素的值
                event.set(l);
            } finally {
                ringBuffer.publish(sequence);
            }
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

示例3-spring中使用

spring中 注册到ioc容器中

package com.ming.core.disruptor;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.concurrent.ThreadFactory;

@Component
public class ElementDisruptorService {
    private volatile Disruptor<Element> elementDisruptor;

    @PostConstruct
    public void init() {
        //初始化disruptor
        // 生产者的线程工厂
        ThreadFactory threadFactory = r -> new Thread(r, "simpleThread");
        // RingBuffer生产工厂,初始化RingBuffer的时候使用
        EventFactory<Element> factory = Element::new;
        // 处理Event的handler
        EventHandler<Element> handler1 = (element, sequence, endOfBatch) -> System.out.println("Element1: " + element.get());
        EventHandler<Element> handler2 = (element, sequence, endOfBatch) -> System.out.println("Element2: " + element.get());
        // 阻塞策略
        BlockingWaitStrategy strategy = new BlockingWaitStrategy();
        // 指定RingBuffer的大小
        int bufferSize = 16;
        // 创建disruptor,采用单生产者模式
        elementDisruptor = new Disruptor(factory, bufferSize, threadFactory, ProducerType.SINGLE, strategy);
        // 设置EventHandler 并且后置清理消费过的数据
        elementDisruptor.handleEventsWith(handler1);
        elementDisruptor.handleEventsWith(handler2);
        // 启动disruptor的线程
        elementDisruptor.start();
    }

    @PreDestroy
    public void destroy() {
        //销毁 disruptor
        elementDisruptor.shutdown();
    }

    public void sendMessage() {
        RingBuffer<Element> ringBuffer = elementDisruptor.getRingBuffer();
        for (int l = 0; true; l++) {
            // 获取下一个可用位置的下标
            long sequence = ringBuffer.next();
            try {
                // 返回可用位置的元素
                Element event = ringBuffer.get(sequence);
                // 设置该位置元素的值
                event.set(l);
            } finally {
                ringBuffer.publish(sequence);
            }
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

优化

单生产者和多生产者

如果确定只有一个线程生产 将disruptor设置为单生产者来提高性能
ProducerType.SINGLE

        Disruptor<Element> disruptor = new Disruptor(factory, bufferSize, threadFactory, ProducerType.SINGLE, strategy);
等待策略 strategy
  • BlockingWaitStrategy Disruptor的默认策略是BlockingWaitStrategy。在BlockingWaitStrategy内部是使用锁和condition来控制线程的唤醒。BlockingWaitStrategy是最低效的策略,但其对CPU的消耗最小并且在各种不同部署环境中能提供更加一致的性能表现

  • SleepingWaitStrategy SleepingWaitStrategy 的性能表现跟 BlockingWaitStrategy 差不多,对 CPU 的消耗也类似,但其对生产者线程的影响最小,通过使用LockSupport.parkNanos(1)来实现循环等待。一般来说Linux系统会暂停一个线程约60µs,这样做的好处是,生产线程不需要采取任何其他行动就可以增加适当的计数器,也不需要花费时间信号通知条件变量。但是,在生产者线程和使用者线程之间移动事件的平均延迟会更高。它在不需要低延迟并且对生产线程的影响较小的情况最好。一个常见的用例是异步日志记录。

  • YieldingWaitStrategy YieldingWaitStrategy是可以使用在低延迟系统的策略之一。YieldingWaitStrategy将自旋以等待序列增加到适当的值。在循环体内,将调用Thread.yield(),以允许其他排队的线程运行。在要求极高性能且事件处理线数小于 CPU 逻辑核心数的场景中,推荐使用此策略;例如,CPU开启超线程的特性。

  • BusySpinWaitStrategy 性能最好,适合用于低延迟的系统。在要求极高性能且事件处理线程数小于CPU逻辑核心树的场景中,推荐使用此策略;例如,CPU开启超线程的特性。

及时清理ring buffer中的对象
     class Element {

            private int value;

            public int get() {
                return value;
            }

            public void set(int value) {
                this.value = value;
            }

            public void clear() {
                value = 0;
            }
        }
...
        disruptor.handleEventsWith(handler1).then(((event, sequence, endOfBatch) -> event.clear()));

总结

disruptor是用来替换jdk 原本的队列 性能会高出很多 不过还是基于内存的 只能在单节点中做一些操作 替换原本使用jdk队列的地方

© 2024 ming博客. All rights reserved.基于rust salvo性能猛的很!