前言
最近看websocket 和tcp的一些应用协议定义的时候 看到有个rsocket协议
spring社区、阿里等 都在试水和研究扩展这个协议
干脆自己玩玩 记录一下 方便自己后续查阅
介绍
参考文档:
https://docs.spring.io/spring-framework/docs/5.3.15/reference/html/web-reactive.html#rsocket-spring
https://rsocket.io/
rsocket是一个上层协议 可以基于tcp、udp(Aeron)、websocket、http/2-stream等方式做传输
提供统一的双向通信方式 提供request-response(请求一次响应一次)、Request Fire-n-Forget(只请求不要求响应)、request-stream(请求一次-多次响应)、Request-channel(打开一个双向通道)几种方式请求
提供常规的响应式操作 例如背压、会话恢复、租赁等等操作
理论上rsocket协议是一个响应式的传输协议有很多优点 具体的参考官网即可
实战
rsocket 应该是两个都算server端 这里方便区分 server作为被动端 client作为主动端
为了方便 使用spring boot rsocket + 测试用例方式来演示
依赖和配置
实例使用tcp作为rsocket底层协议 使用spring boot rsocket框架
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| 。。。 version 直接在继承最新的 当前使用的版本是: spring boot 2.5.7 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-rsocket</artifactId> </dependency> <dependency> <groupId>io.rsocket</groupId> <artifactId>rsocket-transport-local</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>io.rsocket.broker</groupId> <artifactId>rsocket-broker-common-spring</artifactId> <version>0.3.0</version> </dependency>
|
server端实现
- 启动类
就是一个常规的spring boot 项目启动类
1 2 3 4 5 6 7 8 9 10 11 12
| package com.ming.rsocket;
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication public class StartRsocket {
public static void main(String[] args) { SpringApplication.run(StartRsocket.class, args); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| package com.ming.rsocket;
import lombok.extern.slf4j.Slf4j; import org.springframework.messaging.handler.annotation.MessageExceptionHandler; import org.springframework.messaging.handler.annotation.MessageMapping; import org.springframework.stereotype.Controller; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono;
import java.time.Duration; import java.time.LocalDateTime;
@Controller @Slf4j public class TestRsocketController {
@MessageMapping("test-rsocket-req-res") public Mono<String> requestResponse(Mono<String> message) { log.info("测试rsocket req-res模式:{}", message); return Mono.just("测试rsocket req-res模式," + message); }
@MessageMapping("test-rsocket-req-stream") public Flux<String> requestStream(Mono<String> message) { log.info("测试req-stream模式:{}", message); return Flux.interval(Duration.ofSeconds(1)).map(m -> "测试req-stream模式" + m + ":" + message + ":" + LocalDateTime.now()); }
@MessageMapping("test-rsocket-fire-and-forget") public Mono<Void> fireAndForget(Mono<String> message) { log.info("测试fire-and-forget模式:{}", message); return Mono.empty(); }
@MessageMapping("test-rsocket-channel") public Flux<String> channel(Flux<String> message) { return message.map(m -> "测试channel模式:" + m + ":" + LocalDateTime.now()); }
@MessageExceptionHandler public Mono<String> handlerException(Exception e) { return Mono.just(e.getMessage()); } }
|
1 2 3 4 5 6 7
| spring: rsocket: server: port: 20000 transport: tcp
|
client端实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| package com.ming.rsocket;
import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Test; import org.springframework.messaging.rsocket.RSocketRequester; import reactor.core.publisher.Flux;
import java.time.Duration;
@Slf4j public class TestRsocketClient { RSocketRequester requester = RSocketRequester.builder().tcp("localhost", 20000);
@Test public void testReqRes() { String str = requester.route("test-rsocket-req-res") .data("fffffffffffff") .retrieveMono(String.class) .block(); System.out.println(str); }
@Test public void testReqStream() { requester.route("test-rsocket-req-stream") .data("fffffffffffffffffffffffffff") .retrieveFlux(String.class) .map(m -> { System.out.println(m); return m; }) .blockLast();
}
@Test public void testFireAndForget() { requester.route("test-rsocket-fire-and-forget") .data("fffffffffffffffffffffffffffffffffffff") .send() .block(); }
@Test public void testChannel() { Flux flux = Flux.interval(Duration.ofSeconds(1)).map(String::valueOf); requester.route("test-rsocket-channel") .data(flux) .retrieveFlux(String.class) .log() .blockLast(); } }
|
rsocket报文样式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| Frame => Stream ID: 1 Type: REQUEST_FNF Flags: 0b100000000 Length: 79 Metadata: +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| fe 00 00 1d 1c 74 65 73 74 2d 72 73 6f 63 6b 65 |.....test-rsocke| |00000010| 74 2d 66 69 72 65 2d 61 6e 64 2d 66 6f 72 67 65 |t-fire-and-forge| |00000020| 74 |t | +--------+-------------------------------------------------+----------------+ Data: +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 66 66 66 66 66 66 66 66 66 66 66 66 66 66 66 66 |ffffffffffffffff| |00000010| 66 66 66 66 66 66 66 66 66 66 66 66 66 66 66 66 |ffffffffffffffff| |00000020| 66 66 66 66 66 |fffff | +--------+-------------------------------------------------+----------------+
|
开启另一个端口的rsocket服务
因为roscket server 需要指定 实现协议内容 如果要同时支持 tcp websocket之类的操作
那么必然要手动开启一个端口来处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| package com.ming.base.config;
import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.ObjectProvider; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.rsocket.context.RSocketServerBootstrap; import org.springframework.boot.rsocket.netty.NettyRSocketServerFactory; import org.springframework.boot.rsocket.server.RSocketServer; import org.springframework.boot.rsocket.server.RSocketServerCustomizer; import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.annotation.Configuration; import org.springframework.http.client.reactive.ReactorResourceFactory; import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler;
@Configuration @Slf4j public class RSocketServerTcpConfig implements CommandLineRunner {
@Autowired private ReactorResourceFactory resourceFactory; @Autowired private ObjectProvider<RSocketServerCustomizer> customizers; @Autowired private RSocketMessageHandler rSocketMessageHandler; @Autowired private ApplicationEventPublisher applicationEventPublisher;
@Override public void run(String... args) throws Exception { NettyRSocketServerFactory factory = new NettyRSocketServerFactory(); factory.setResourceFactory(resourceFactory); factory.setTransport(RSocketServer.Transport.TCP); factory.setPort(30000); factory.setRSocketServerCustomizers(customizers.orderedStream().toList()); log.info("start tcp rsocket server ..........."); RSocketServerBootstrap rSocketServerBootstrap = new RSocketServerBootstrap(factory, rSocketMessageHandler.responder()); rSocketServerBootstrap.setApplicationEventPublisher(applicationEventPublisher); rSocketServerBootstrap.start(); }
}
|
总结
暂时只随便玩玩 深入的背压、租赁什么的 要用的时候 再深入看看