基于disruptor实现简单topic分发消息功能

前言

disruptor 性能的确很强 但是只是做了 队列的功能 如果有多种消息 就必须自己去扩展一下 或者用多个队列
自己手写了一套简易的 基于disruptor 点对点的 topic分发功能

思路

通过一个分发处理器 将收到消息按照topic 分发到不同的处理器 所有的消息由DistributeEventHandler 按照DisruptorTopicEnum 进行分发到不同的BaseEventHandler实现类

发送消息通过DisruptorService

示例

包结构如下图: 包结构图

  • Element
package com.ming.core.disruptor;

import lombok.Data;
import lombok.extern.slf4j.Slf4j;

/**
 * 测试disruptor的element
 *
 * @author ming
 * @date 2020-10-27 16:17
 */
@Slf4j
@Data
public class Element<T> {
    private DisruptorTopicEnum topic;
    private T data;
}
  • DisruptorService
package com.ming.core.disruptor;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.concurrent.ThreadFactory;

/**
 * disruptor 服务
 *
 * @author ming
 * @date 2020-10-27 17:03
 */
@Primary
@Component
@Slf4j
public class DisruptorService {
    @Autowired
    private DistributeEventHandler distributeEventHandler;


    private Disruptor<Element> elementDisruptor;

    /**
     * 发送消息
     *
     * @author ming
     * @date 2020-10-27 17:08
     */
    @SuppressWarnings("unchecked")
    public <T> void sendMessage(DisruptorTopicEnum disruptorTopicEnum, T data) {
        RingBuffer<Element> ringBuffer = elementDisruptor.getRingBuffer();
        // 获取下一个可用位置的下标
        long sequence = ringBuffer.next();
        try {
            // 返回可用位置的元素
            Element<T> event = ringBuffer.get(sequence);
            // 设置该位置元素的值
            event.setData(data);
            event.setTopic(disruptorTopicEnum);
        } finally {
            ringBuffer.publish(sequence);
        }
    }

    /**
     * 初始化 disruptor队列
     *
     * @author ming
     * @date 2020-10-27 17:12
     */
    @PostConstruct
    public void init() {
        //初始化disruptor
        // 生产者的线程工厂
        ThreadFactory threadFactory = r -> new Thread(r, "simpleThread");
        // RingBuffer生产工厂,初始化RingBuffer的时候使用
        EventFactory<Element> factory = Element::new;
        // 阻塞策略
        BlockingWaitStrategy strategy = new BlockingWaitStrategy();
        // 指定RingBuffer的大小
        int bufferSize = 16;
        // 创建disruptor,采用单生产者模式
        elementDisruptor = new Disruptor(factory, bufferSize, threadFactory, ProducerType.SINGLE, strategy);
        // 设置EventHandler 并且后置清理消费过的数据
        elementDisruptor.handleEventsWith(distributeEventHandler);
        elementDisruptor.handleExceptionsFor(distributeEventHandler).with(disruptorExceptionHandler);
        // 启动disruptor的线程
        elementDisruptor.start();
    }

    /**
     * 销毁 disruptor队列
     *
     * @author ming
     * @date 2020-10-27 17:12
     */
    @PreDestroy
    public void destroy() {
        //销毁 disruptor
        elementDisruptor.shutdown();
    }
}

  • DisruptorTopicEnum
package com.ming.core.disruptor;

/**
 * disruptor topic 枚举
 *
 * @author ming
 * @date 2020-10-27 16:58
 */
public enum DisruptorTopicEnum {
    LOG_TOPIC, TEST1_TOPIC, TEST2_TOPIC;
}

  • DistributeEventHandler
package com.ming.core.disruptor;

import com.lmax.disruptor.EventHandler;
import com.ming.core.disruptor.handler.LogEventHandler;
import com.ming.core.disruptor.handler.Test1EventHandler;
import com.ming.core.disruptor.handler.Test2EventHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * 基础event handler
 * 根据topic 分发数据
 *
 * @author ming
 * @date 2020-10-27 17:15
 */
@Component
public class DistributeEventHandler implements EventHandler {
    @Autowired
    private Test1EventHandler test1EventHandler;
    @Autowired
    private Test2EventHandler test2EventHandler;
    @Autowired
    private LogEventHandler logEventHandler;

    /**
     * Called when a publisher has published an event to the {@link RingBuffer}.  The {@link BatchEventProcessor} will
     * read messages from the {@link RingBuffer} in batches, where a batch is all of the events available to be
     * processed without having to wait for any new event to arrive.  This can be useful for event handlers that need
     * to do slower operations like I/O as they can group together the data from multiple events into a single
     * operation.  Implementations should ensure that the operation is always performed when endOfBatch is true as
     * the time between that message an the next one is inderminate.
     *
     * @param event      published to the {@link RingBuffer}
     * @param sequence   of the event being processed
     * @param endOfBatch flag to indicate if this is the last event in a batch from the {@link RingBuffer}
     * @throws Exception if the EventHandler would like the exception handled further up the chain.
     */
    @Override
    @SuppressWarnings("unchecked")
    public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception {
        if (!(event instanceof Element)) {
            throw new RuntimeException("类型错误,必须为Element类型~");
        }
        Element<?> element = (Element<?>) event;
        //分发topic  到对应的handler
        switch (element.getTopic()) {
            case LOG_TOPIC -> logEventHandler.onEvent((Element<String>) event, sequence, endOfBatch);
            case TEST1_TOPIC -> test1EventHandler.onEvent((Element<String>) event, sequence, endOfBatch);
            case TEST2_TOPIC -> test2EventHandler.onEvent((Element<String>) event, sequence, endOfBatch);
            default -> throw new RuntimeException("topic未注册!无法分发消息");
        }
    }
}

  • DisruptorExceptionHandler
package com.ming.core.disruptor;

import com.lmax.disruptor.ExceptionHandler;
import com.ming.core.utils.JacksonJsonSingleton;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/**
 * disruptor 异常处理
 *
 * @author ming
 * @date 2020-10-28 11:32
 */
@Component
@Slf4j
public class DisruptorExceptionHandler<T> implements ExceptionHandler<T> {
    /**
     * <p>Strategy for handling uncaught exceptions when processing an event.</p>
     *
     * <p>If the strategy wishes to terminate further processing by the {@link BatchEventProcessor}
     * then it should throw a {@link RuntimeException}.</p>
     *
     * @param ex       the exception that propagated from the {@link EventHandler}.
     * @param sequence of the event which cause the exception.
     * @param event    being processed when the exception occurred.  This can be null.
     */
    @Override
    public void handleEventException(Throwable ex, long sequence, Object event) {
        log.error("处理disruptor事件异常,异常内容:{},编号:{},事件内容:{}", ex.getMessage(), sequence, JacksonJsonSingleton.writeString(event));
        ex.printStackTrace();

    }

    /**
     * Callback to notify of an exception during {@link LifecycleAware#onStart()}
     *
     * @param ex throw during the starting process.
     */
    @Override
    public void handleOnStartException(Throwable ex) {
        log.error("处理disruptor启动异常,异常内容:{}", ex.getMessage());
        ex.printStackTrace();
    }

    /**
     * Callback to notify of an exception during {@link LifecycleAware#onShutdown()}
     *
     * @param ex throw during the shutdown process.
     */
    @Override
    public void handleOnShutdownException(Throwable ex) {
        log.error("处理disruptor关闭异常,异常内容:{}", ex.getMessage());
        ex.printStackTrace();
    }
}

  • BaseEventHandler<T, R>
package com.ming.core.disruptor.handler;

import com.ming.core.disruptor.Element;

/**
 * 基础 事件处理接口
 *
 * @author ming
 * @date 2020-10-28 09:37
 */
public interface BaseEventHandler<T, R> {

    R onEvent(Element<T> event, long sequence, boolean endOfBatch);
}
  • LogEventHandler
package com.ming.core.disruptor.handler;

import com.ming.core.disruptor.Element;
import org.springframework.stereotype.Component;

/**
 * 基础event handler
 * 根据topic 分发数据
 *
 * @author ming
 * @date 2020-10-27 17:15
 */
@Component
public class LogEventHandler implements BaseEventHandler<String, String> {


    @Override
    public String onEvent(Element<String> event, long sequence, boolean endOfBatch) {
        System.out.println("log:" + event.getData());
        return "log";
    }
}
  • Test1EventHandler
package com.ming.core.disruptor.handler;

import com.ming.core.disruptor.Element;
import org.springframework.stereotype.Component;

/**
 * 基础event handler
 * 根据topic 分发数据
 *
 * @author ming
 * @date 2020-10-27 17:15
 */
@Component
public class Test1EventHandler implements BaseEventHandler<String, String> {


    @Override
    public String onEvent(Element<String> event, long sequence, boolean endOfBatch) {
        System.out.println("Test1:" + event.getData());
        return "Test1";
    }
}
  • Test2EventHandler
package com.ming.core.disruptor.handler;

import com.ming.core.disruptor.Element;
import org.springframework.stereotype.Component;

/**
 * 基础event handler
 * 根据topic 分发数据
 *
 * @author ming
 * @date 2020-10-27 17:15
 */
@Component
public class Test2EventHandler implements BaseEventHandler<String, String> {


    @Override
    public String onEvent(Element<String> event, long sequence, boolean endOfBatch) {
        System.out.println("Test2:" + event.getData());
        return "Test2";
    }
}
  • 测试用例
package com.ming.core.disruptor;

import com.ming.Start;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

/**
 * disruptor service test
 *
 * @author ming
 * @date 2020-10-28 10:21
 */
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Start.class)
class DisruptorServiceTest {
    @Autowired
    private DisruptorService disruptorService;

    @Test
    void sendMessage() {
        int max = 10000;
        long now = System.currentTimeMillis();
        new Thread(() -> {
            for (int i = 0; i < max; i++) {
                disruptorService.sendMessage(DisruptorTopicEnum.LOG_TOPIC, i + "minglog" + (System.currentTimeMillis()-now));
            }
        }).start();
        new Thread(() -> {
            for (int i = 0; i < max; i++) {
                disruptorService.sendMessage(DisruptorTopicEnum.TEST1_TOPIC, i + "mingtest1" + (System.currentTimeMillis()-now));
            }
        }).start();

        new Thread(() -> {
            for (int i = 0; i < max; i++) {
                disruptorService.sendMessage(DisruptorTopicEnum.TEST2_TOPIC, i + "mingtest2" + (System.currentTimeMillis()-now));
            }
        }).start();


        try {
            Thread.sleep(10000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
}

总结

disruptor 性能是很高 但是他本身只是负责队列部分 大多数场景需要根据具体需求进行扩展调整 大多数用jdk的queue的 都可以用disruptor替换

© 2024 ming博客. All rights reserved.基于rust salvo性能猛的很!