基于disruptor实现简单topic分发消息功能

前言

disruptor 性能的确很强 但是只是做了 队列的功能 如果有多种消息 就必须自己去扩展一下 或者用多个队列
自己手写了一套简易的 基于disruptor 点对点的 topic分发功能

思路

通过一个分发处理器 将收到消息按照topic 分发到不同的处理器
所有的消息由DistributeEventHandler 按照DisruptorTopicEnum 进行分发到不同的BaseEventHandler实现类

发送消息通过DisruptorService

示例

包结构如下图:
包结构图

  • Element

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    package com.ming.core.disruptor;

    import lombok.Data;
    import lombok.extern.slf4j.Slf4j;

    /**
    * 测试disruptor的element
    *
    * @author ming
    * @date 2020-10-27 16:17
    */
    @Slf4j
    @Data
    public class Element<T> {
    private DisruptorTopicEnum topic;
    private T data;
    }
  • DisruptorService

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    package com.ming.core.disruptor;

    import com.lmax.disruptor.BlockingWaitStrategy;
    import com.lmax.disruptor.EventFactory;
    import com.lmax.disruptor.RingBuffer;
    import com.lmax.disruptor.dsl.Disruptor;
    import com.lmax.disruptor.dsl.ProducerType;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Primary;
    import org.springframework.stereotype.Component;

    import javax.annotation.PostConstruct;
    import javax.annotation.PreDestroy;
    import java.util.concurrent.ThreadFactory;

    /**
    * disruptor 服务
    *
    * @author ming
    * @date 2020-10-27 17:03
    */
    @Primary
    @Component
    @Slf4j
    public class DisruptorService {
    @Autowired
    private DistributeEventHandler distributeEventHandler;


    private Disruptor<Element> elementDisruptor;

    /**
    * 发送消息
    *
    * @author ming
    * @date 2020-10-27 17:08
    */
    @SuppressWarnings("unchecked")
    public <T> void sendMessage(DisruptorTopicEnum disruptorTopicEnum, T data) {
    RingBuffer<Element> ringBuffer = elementDisruptor.getRingBuffer();
    // 获取下一个可用位置的下标
    long sequence = ringBuffer.next();
    try {
    // 返回可用位置的元素
    Element<T> event = ringBuffer.get(sequence);
    // 设置该位置元素的值
    event.setData(data);
    event.setTopic(disruptorTopicEnum);
    } finally {
    ringBuffer.publish(sequence);
    }
    }

    /**
    * 初始化 disruptor队列
    *
    * @author ming
    * @date 2020-10-27 17:12
    */
    @PostConstruct
    public void init() {
    //初始化disruptor
    // 生产者的线程工厂
    ThreadFactory threadFactory = r -> new Thread(r, "simpleThread");
    // RingBuffer生产工厂,初始化RingBuffer的时候使用
    EventFactory<Element> factory = Element::new;
    // 阻塞策略
    BlockingWaitStrategy strategy = new BlockingWaitStrategy();
    // 指定RingBuffer的大小
    int bufferSize = 16;
    // 创建disruptor,采用单生产者模式
    elementDisruptor = new Disruptor(factory, bufferSize, threadFactory, ProducerType.SINGLE, strategy);
    // 设置EventHandler 并且后置清理消费过的数据
    elementDisruptor.handleEventsWith(distributeEventHandler);
    elementDisruptor.handleExceptionsFor(distributeEventHandler).with(disruptorExceptionHandler);
    // 启动disruptor的线程
    elementDisruptor.start();
    }

    /**
    * 销毁 disruptor队列
    *
    * @author ming
    * @date 2020-10-27 17:12
    */
    @PreDestroy
    public void destroy() {
    //销毁 disruptor
    elementDisruptor.shutdown();
    }
    }
  • DisruptorTopicEnum

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    package com.ming.core.disruptor;

    /**
    * disruptor topic 枚举
    *
    * @author ming
    * @date 2020-10-27 16:58
    */
    public enum DisruptorTopicEnum {
    LOG_TOPIC, TEST1_TOPIC, TEST2_TOPIC;
    }
  • DistributeEventHandler

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    package com.ming.core.disruptor;

    import com.lmax.disruptor.EventHandler;
    import com.ming.core.disruptor.handler.LogEventHandler;
    import com.ming.core.disruptor.handler.Test1EventHandler;
    import com.ming.core.disruptor.handler.Test2EventHandler;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;

    /**
    * 基础event handler
    * 根据topic 分发数据
    *
    * @author ming
    * @date 2020-10-27 17:15
    */
    @Component
    public class DistributeEventHandler implements EventHandler {
    @Autowired
    private Test1EventHandler test1EventHandler;
    @Autowired
    private Test2EventHandler test2EventHandler;
    @Autowired
    private LogEventHandler logEventHandler;

    /**
    * Called when a publisher has published an event to the {@link RingBuffer}. The {@link BatchEventProcessor} will
    * read messages from the {@link RingBuffer} in batches, where a batch is all of the events available to be
    * processed without having to wait for any new event to arrive. This can be useful for event handlers that need
    * to do slower operations like I/O as they can group together the data from multiple events into a single
    * operation. Implementations should ensure that the operation is always performed when endOfBatch is true as
    * the time between that message an the next one is inderminate.
    *
    * @param event published to the {@link RingBuffer}
    * @param sequence of the event being processed
    * @param endOfBatch flag to indicate if this is the last event in a batch from the {@link RingBuffer}
    * @throws Exception if the EventHandler would like the exception handled further up the chain.
    */
    @Override
    @SuppressWarnings("unchecked")
    public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception {
    if (!(event instanceof Element)) {
    throw new RuntimeException("类型错误,必须为Element类型~");
    }
    Element<?> element = (Element<?>) event;
    //分发topic 到对应的handler
    switch (element.getTopic()) {
    case LOG_TOPIC -> logEventHandler.onEvent((Element<String>) event, sequence, endOfBatch);
    case TEST1_TOPIC -> test1EventHandler.onEvent((Element<String>) event, sequence, endOfBatch);
    case TEST2_TOPIC -> test2EventHandler.onEvent((Element<String>) event, sequence, endOfBatch);
    default -> throw new RuntimeException("topic未注册!无法分发消息");
    }
    }
    }
  • DisruptorExceptionHandler

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    package com.ming.core.disruptor;

    import com.lmax.disruptor.ExceptionHandler;
    import com.ming.core.utils.JacksonJsonSingleton;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.stereotype.Component;

    /**
    * disruptor 异常处理
    *
    * @author ming
    * @date 2020-10-28 11:32
    */
    @Component
    @Slf4j
    public class DisruptorExceptionHandler<T> implements ExceptionHandler<T> {
    /**
    * <p>Strategy for handling uncaught exceptions when processing an event.</p>
    *
    * <p>If the strategy wishes to terminate further processing by the {@link BatchEventProcessor}
    * then it should throw a {@link RuntimeException}.</p>
    *
    * @param ex the exception that propagated from the {@link EventHandler}.
    * @param sequence of the event which cause the exception.
    * @param event being processed when the exception occurred. This can be null.
    */
    @Override
    public void handleEventException(Throwable ex, long sequence, Object event) {
    log.error("处理disruptor事件异常,异常内容:{},编号:{},事件内容:{}", ex.getMessage(), sequence, JacksonJsonSingleton.writeString(event));
    ex.printStackTrace();

    }

    /**
    * Callback to notify of an exception during {@link LifecycleAware#onStart()}
    *
    * @param ex throw during the starting process.
    */
    @Override
    public void handleOnStartException(Throwable ex) {
    log.error("处理disruptor启动异常,异常内容:{}", ex.getMessage());
    ex.printStackTrace();
    }

    /**
    * Callback to notify of an exception during {@link LifecycleAware#onShutdown()}
    *
    * @param ex throw during the shutdown process.
    */
    @Override
    public void handleOnShutdownException(Throwable ex) {
    log.error("处理disruptor关闭异常,异常内容:{}", ex.getMessage());
    ex.printStackTrace();
    }
    }
  • BaseEventHandler

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    package com.ming.core.disruptor.handler;

    import com.ming.core.disruptor.Element;

    /**
    * 基础 事件处理接口
    *
    * @author ming
    * @date 2020-10-28 09:37
    */
    public interface BaseEventHandler<T, R> {

    R onEvent(Element<T> event, long sequence, boolean endOfBatch);
    }
  • LogEventHandler

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    package com.ming.core.disruptor.handler;

    import com.ming.core.disruptor.Element;
    import org.springframework.stereotype.Component;

    /**
    * 基础event handler
    * 根据topic 分发数据
    *
    * @author ming
    * @date 2020-10-27 17:15
    */
    @Component
    public class LogEventHandler implements BaseEventHandler<String, String> {


    @Override
    public String onEvent(Element<String> event, long sequence, boolean endOfBatch) {
    System.out.println("log:" + event.getData());
    return "log";
    }
    }
  • Test1EventHandler

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    package com.ming.core.disruptor.handler;

    import com.ming.core.disruptor.Element;
    import org.springframework.stereotype.Component;

    /**
    * 基础event handler
    * 根据topic 分发数据
    *
    * @author ming
    * @date 2020-10-27 17:15
    */
    @Component
    public class Test1EventHandler implements BaseEventHandler<String, String> {


    @Override
    public String onEvent(Element<String> event, long sequence, boolean endOfBatch) {
    System.out.println("Test1:" + event.getData());
    return "Test1";
    }
    }
  • Test2EventHandler

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    package com.ming.core.disruptor.handler;

    import com.ming.core.disruptor.Element;
    import org.springframework.stereotype.Component;

    /**
    * 基础event handler
    * 根据topic 分发数据
    *
    * @author ming
    * @date 2020-10-27 17:15
    */
    @Component
    public class Test2EventHandler implements BaseEventHandler<String, String> {


    @Override
    public String onEvent(Element<String> event, long sequence, boolean endOfBatch) {
    System.out.println("Test2:" + event.getData());
    return "Test2";
    }
    }
  • 测试用例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    package com.ming.core.disruptor;

    import com.ming.Start;
    import org.junit.jupiter.api.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;

    /**
    * disruptor service test
    *
    * @author ming
    * @date 2020-10-28 10:21
    */
    @RunWith(SpringRunner.class)
    @SpringBootTest(classes = Start.class)
    class DisruptorServiceTest {
    @Autowired
    private DisruptorService disruptorService;

    @Test
    void sendMessage() {
    int max = 10000;
    long now = System.currentTimeMillis();
    new Thread(() -> {
    for (int i = 0; i < max; i++) {
    disruptorService.sendMessage(DisruptorTopicEnum.LOG_TOPIC, i + "minglog" + (System.currentTimeMillis()-now));
    }
    }).start();
    new Thread(() -> {
    for (int i = 0; i < max; i++) {
    disruptorService.sendMessage(DisruptorTopicEnum.TEST1_TOPIC, i + "mingtest1" + (System.currentTimeMillis()-now));
    }
    }).start();

    new Thread(() -> {
    for (int i = 0; i < max; i++) {
    disruptorService.sendMessage(DisruptorTopicEnum.TEST2_TOPIC, i + "mingtest2" + (System.currentTimeMillis()-now));
    }
    }).start();


    try {
    Thread.sleep(10000L);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }

    }
    }

总结

disruptor 性能是很高 但是他本身只是负责队列部分
大多数场景需要根据具体需求进行扩展调整
大多数用jdk的queue的 都可以用disruptor替换

------ 本文结束 ------

版权声明
ming创作并维护,博客采用CC协议
本文首发于ming 博客( https://blog.xujiuming.com ),版权所有,转载请注明出处!