前言
spring虽然也有事件 但是麻烦
大多数情况下 guava的event足够使用 需要高性能 可以采用disruptor 參考: disruptor使用笔记 基于disruptor实现简单topic分发消息功能
实战
BaseEvent
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| package com.ming.service.event;
public interface BaseEvent {
default boolean async() { return true; } }
|
BaseHandler
1 2 3 4 5 6 7 8 9 10 11 12 13
| package com.ming.service.event;
public interface BaseHandler<T extends BaseEvent> {
void handle(T event); }
|
EventService
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
| package com.ming.service.event;
import com.google.common.eventbus.AsyncEventBus; import com.google.common.eventbus.EventBus; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.ming.core.utils.JSONSingleton; import com.ming.core.utils.SpringBeanManager; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit;
@Component @Slf4j public class EventService {
@Autowired List<BaseHandler<?>> handlers;
private EventBus eventBus; private AsyncEventBus asyncEventBus;
public static <T extends BaseEvent> void staticPost(BaseEvent event) { SpringBeanManager.getBean(EventService.class).post(event); }
@PostConstruct private void init() { ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() .setNameFormat("eventBus-pool-%d").build(); ExecutorService pool = new ThreadPoolExecutor(5, 200, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy()); eventBus = new EventBus(); asyncEventBus = new AsyncEventBus(pool); handlers.forEach(h -> { eventBus.register(h); asyncEventBus.register(h); log.info("{}注册消息总线", h.getClass().getSimpleName()); }); }
public <T extends BaseEvent> void post(BaseEvent event) { log.debug("event:{}", JSONSingleton.writeString(event)); if (event.async()) { asyncEventBus.post(event); } else { eventBus.post(event); } } }
|
使用
1 2 3 4 5 6 7 8 9 10 11 12
| package com.ming.service.event;
import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor;
@Data @AllArgsConstructor @NoArgsConstructor public class TestEvent implements BaseEvent { private long number; }
|
- 定义对应的eventHandler
注意 @Subscribe 表明订阅事件 @AllowConcurrentEvents 表明并发安全 然后内部是根据event类型来分发到不同的订阅方的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| package com.ming.service.event;
import com.google.common.eventbus.AllowConcurrentEvents; import com.google.common.eventbus.Subscribe; import org.springframework.stereotype.Component;
@Component public class TestEventHandler implements BaseHandler<TestEvent> {
@Override @Subscribe @AllowConcurrentEvents public void handle(TestEvent event) { System.out.println(event.toString()); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| package com.ming.service.core;
import com.ming.service.event.EventService; import com.ming.service.event.TestEvent; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service;
@Service public class TestEventService {
@Autowired private EventService eventService;
public void test() { eventService.post(new TestEvent(1)); EventService.staticPost(new TestEvent(2)); } }
|
总结
guava 的event 简单粗暴
需要简单明了的事件处理 使用 guava的 event
需要高性能的 可以使用disruptor自己实现一套简单的