netty-传输手段笔记

###传输案例

java oio

package com.ming;

import java.io.IOException;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.Charset;

/**
 * 基于java 阻塞io的 服务端案例
 *
 * @author ming
 * @date 2018-04-11 14:53
 */
public class JavaOio {

    public static void main(String[] args) throws IOException {
        new JavaOio().Start(20000);
    }

    /**
     * 启动java oio 方法
     *
     * @param port
     * @author ming
     * @date 2018-04-11 14:57
     */
    public void Start(int port) throws IOException {
        final ServerSocket socket = new ServerSocket(port);
        for (; ; ) {
            //接受链接
            final Socket clientSocket = socket.accept();
            System.out.println("accept client::" + clientSocket);
            //建立新线程执行 客户端的请求
            new Thread(() -> {
                OutputStream outputStream = null;
                try {
                    //获取输出流
                    outputStream = clientSocket.getOutputStream();
                    //输出数据
                    outputStream.write("hi ming ".getBytes(Charset.forName("UTF-8")));
                    //刷新数据
                    outputStream.flush();
                    clientSocket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } finally {
                    try {
                        outputStream.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
    }
}

java nio

package com.ming;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Set;

/**
 * java 实现nio 模式的服务端
 *
 * @author ming
 * @date 2018-04-11 15:08
 */
public class JavaNio {


    public static void main(String[] args) throws IOException {
        new JavaNio().Start(20000);
    }

    /**
     * 启动java nio 服务端
     *
     * @param port
     * @author ming
     * @date 2018-04-11 15:09
     */
    public void Start(int port) throws IOException {
        //打开 server socket channel
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        //设置 为非阻塞队列
        serverSocketChannel.configureBlocking(false);
        //通过socket channel 获取socket
        ServerSocket socket = serverSocketChannel.socket();
        //获取socket 的地址
        InetSocketAddress socketAddress = new InetSocketAddress(port);
        //绑定 socket地址到socket
        socket.bind(socketAddress);
        //打开selector
        Selector selector = Selector.open();
        //将socket channel 注册到selector上
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        //返回客户端的消息
        final ByteBuffer msg = ByteBuffer.wrap("hi ming ".getBytes("UTF-8"));

        for (; ; ) {
            try {
                //阻塞等待需要处理的新事件
                selector.select();
            } catch (IOException e) {
                e.printStackTrace();
                break;
            }
            //获取所有连接事件的 select key
            Set<SelectionKey> readyKeys = selector.selectedKeys();
            readyKeys.forEach(key -> {
                        try {
                            //检查时间 是否是一个新的 可以被接受的链接
                            if (key.isAcceptable()) {
                                ServerSocketChannel server = (ServerSocketChannel) key.channel();
                                SocketChannel client = server.accept();
                                if (null == client) {
                                    return;
                                }
                                client.configureBlocking(false);
                                client.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ, msg.duplicate());
                                System.out.println("accept from " + client);
                            }
                            //检查socket是否准备好写数据
                            if (key.isWritable()) {
                                SocketChannel client = (SocketChannel) key.channel();
                                ByteBuffer byteBuffer = (ByteBuffer) key.attachment();
                                while (byteBuffer.hasRemaining()) {
                                    if (client.write(byteBuffer) == 0) {
                                        break;
                                    }
                                }
                                client.close();
                            }

                        } catch (IOException e) {
                            key.cancel();
                            try {
                                key.channel().close();
                            } catch (IOException e1) {
                                e1.printStackTrace();
                            }
                        }
                    }
            );
        }
    }
}

netty oio

package com.ming;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.oio.OioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.oio.OioServerSocketChannel;
import io.netty.util.CharsetUtil;

import java.net.InetSocketAddress;

/**
 * netty oio 实现
 *
 * @author ming
 * @date 2018-04-16 15:56
 */
public class NettyOio {
    public static void main(String[] args) throws InterruptedException {
        new NettyOio().Start(20000);
    }

    /**
     * netty oio bootstrap
     *
     * @author ming
     * @date 2018-04-16 15:56
     */
    public void Start(int port) throws InterruptedException {
        final ByteBuf byteBuf = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("hi ming ", CharsetUtil.UTF_8));
        // 使用 oio 事件循环处理
        EventLoopGroup eventLoopGroup = new OioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(eventLoopGroup)
                    //使用oio 渠道
                    .channel(OioServerSocketChannel.class)
                    .localAddress(new InetSocketAddress(port))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                                @Override
                                public void channelActive(ChannelHandlerContext ctx) throws Exception {
                                    System.out.println("accept " + ctx);
                                    //输出一条信息 关闭 渠道
                                    ctx.writeAndFlush(byteBuf.duplicate())
                                            .addListener(ChannelFutureListener.CLOSE);
                                }
                            });
                        }
                    });
            //绑定服务器
            ChannelFuture future = serverBootstrap.bind().sync();
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            eventLoopGroup.shutdownGracefully().sync();
        }
    }
}

netty nio

package com.ming;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.CharsetUtil;

import java.net.InetSocketAddress;

/**
 * netty nio 实现
 *
 * @author ming
 * @date 2018-04-16 16:11
 */
public class NettyNio {

    public static void main(String[] args) throws InterruptedException {
        new NettyNio().Start(20000);
    }

    /**
     * 启动 netty nio server
     *
     * @author ming
     * @date 2018-04-16 16:12
     */
    public void Start(int port) throws InterruptedException {
        final ByteBuf byteBuf = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("hi ming ", CharsetUtil.UTF_8));
        //使用 nio 事件循环处理器
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(eventLoopGroup)
                    //使用nio渠道
                    .channel(NioServerSocketChannel.class)
                    .localAddress(new InetSocketAddress(port))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                                @Override
                                public void channelActive(ChannelHandlerContext ctx) throws Exception {
                                    System.out.println("accept " + ctx);
                                    ctx.writeAndFlush(byteBuf.duplicate()).addListener(ChannelFutureListener.CLOSE);
                                }
                            });
                        }
                    });
            ChannelFuture future = serverBootstrap.bind().sync();
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            eventLoopGroup.shutdownGracefully().sync();
        }


    }
}

epoll

package com.ming;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.CharsetUtil;

import java.net.InetSocketAddress;

/**
 * 基于linux 的Epoll 非阻塞 方式实现的 io传输
 *
 * @author ming
 * @date 2018-04-16 17:28
 */
public class NettyEpoll {

    public static void main(String[] args) throws InterruptedException {
        new NettyEpoll().Start(20000);
    }

    /**
     * 启动 基于 epoll 的server
     *
     * @author ming
     * @date 2018-04-16 17:29
     */
    public void Start(int port) throws InterruptedException {
        final ByteBuf byteBuf = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("hi ming ", CharsetUtil.UTF_8));
        //使用epoll
        EventLoopGroup eventLoopGroup = new EpollEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(eventLoopGroup)
                    .channel(EpollServerSocketChannel.class)
                    .localAddress(new InetSocketAddress(port))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                                @Override
                                public void channelActive(ChannelHandlerContext ctx) throws Exception {
                                    System.out.println("accept" + ctx);
                                    ctx.writeAndFlush(byteBuf.duplicate()).addListener(ChannelFutureListener.CLOSE);
                                }
                            });

                        }
                    });
            ChannelFuture future = serverBootstrap.bind().sync();
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            eventLoopGroup.shutdownGracefully().sync();
        }
    }

}

jvm local

1:jvm local server

package com.ming.JvmLocal;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.local.LocalAddress;
import io.netty.channel.local.LocalChannel;
import io.netty.channel.local.LocalEventLoopGroup;
import io.netty.channel.local.LocalServerChannel;
import io.netty.util.CharsetUtil;

/**
 * netty 基于jvm内部本地通信 server实现
 *
 * @author ming
 * @date 2018-04-17 13:45
 */
public class NettyJvmLocalServer {


    /**
     * 启动方法
     *
     * @author ming
     * @date 2018-04-17 13:50
     */
    public void Start() throws InterruptedException {
        final ByteBuf byteBuf = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("hi ming ", CharsetUtil.UTF_8));
        //注册 local 事件处理器
        EventLoopGroup eventLoopGroup = new LocalEventLoopGroup();

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(eventLoopGroup)
                    //使用Local server channel
                    .channel(LocalServerChannel.class)
                    //使用 local 地址
                    .localAddress(new LocalAddress(NettyJvmLocal.LOCAL_ADDRESS))
                    .childHandler(new ChannelInitializer<LocalChannel>() {
                        @Override
                        protected void initChannel(LocalChannel ch) throws Exception {
                            ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                                @Override
                                public void channelActive(ChannelHandlerContext ctx) throws Exception {
                                    System.out.println("accept " + ctx);
                                    ctx.writeAndFlush(byteBuf).addListener(ChannelFutureListener.CLOSE);
                                }
                            });
                        }
                    });
            ChannelFuture future = serverBootstrap.bind().sync();
            future.channel().closeFuture().sync();

        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            eventLoopGroup.shutdownGracefully().sync();
        }


    }

}

2: jvm local client

package com.ming.JvmLocal;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.local.LocalAddress;
import io.netty.channel.local.LocalChannel;
import io.netty.channel.local.LocalEventLoopGroup;
import io.netty.util.CharsetUtil;

/**
 * netty 基于jvm内部本地通信 client 实现
 *
 * @author ming
 * @date 2018-04-17 13:45
 */
public class NettyJvmLocalClient {


    /**
     * 启动方法
     *
     * @author ming
     * @date 2018-04-17 13:50
     */
    public void Start() throws InterruptedException {
        final ByteBuf byteBuf = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("hi ming client", CharsetUtil.UTF_8));
        //注册 local 事件处理器
        EventLoopGroup eventLoopGroup = new LocalEventLoopGroup();

        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup)
                    //使用 localChannel渠道
                    .channel(LocalChannel.class)
                    //注册 local模式的地址
                    .remoteAddress(new LocalAddress(NettyJvmLocal.LOCAL_ADDRESS))
                    .handler(new ChannelInitializer<LocalChannel>() {
                        @Override
                        protected void initChannel(LocalChannel ch) throws Exception {
                            ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                                @Override
                                public void channelActive(ChannelHandlerContext ctx) throws Exception {
                                    System.out.println("connect " + ctx);
                                    ctx.writeAndFlush(byteBuf).addListener(ChannelFutureListener.CLOSE);
                                }
                            });
                        }
                    });
            ChannelFuture future = bootstrap.connect().sync();
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            eventLoopGroup.shutdownGracefully().sync();
        }
    }

}

3:jvm local demo

package com.ming.JvmLocal;

/**
 * 调用 netty jvm  local server 和client 的客户端
 *
 * @author ming
 * @date 2018-04-17 14:09
 */
public class NettyJvmLocal {
    /**
     * 本地地址
     *
     * @author ming
     * @date 2018-04-17 14:16
     */
    public static final String LOCAL_ADDRESS = "ming";

    public static void main(String[] args) throws InterruptedException {
        // 启动 server
        new Thread(() -> {
            try {
                new NettyJvmLocalServer().Start();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        //暂停 线程 1s 等待server 启动完毕
        Thread.sleep(1000L);

        //启动client
        new Thread(() -> {
            try {
                new NettyJvmLocalClient().Start();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }

}

传输

传输是基于channel 的

channel是有顺序的 因为继承 java.lang.Comparable
channelHandler:常规用途

  • 数据格式转换
  • 异常处理
  • 提供channel 状态变更通知
  • 提供channel 注册或者注销 EventLoop中的通知
  • 提供用户自定义事件通知

channel 线程安全 并且保证顺序发送

内置传输手段

名称备注
NIOio.netty.channel.socket.nio使用java nio
Epollio.netty.channel.epoll基于jni驱动的epoll()和非阻塞io 只能在linux上支持 比NIO快 并且完全非阻塞
OIOio.netty.channel.socket.oio使用java net包 阻塞流
Localio.netty.channel.local直接使用jvm内部管道通信
Embeddedio.netty.channel.embedded测试channelHandler使用的不依赖网络

nio 非阻塞 io

基于selector 状态变化:

  • 新的channel 已经被接受并且准备就绪
  • channel 链接完成
  • channel 有就绪的可供读取的数据
  • channel 可用于写数据
    通过java.nio.channels.SelectionKey定义的位 组合成一组应用程序正在请求通知的状态变化集合
    |名称|描述|
    |:–|:–|
    |OP_ACCEPT|请求接受新链接创建channel获得通知|
    |OP_CONNECT|请求建立第一个链接时候获得通知|
    |OP_READ|请求数据就绪 可从channel 读取时获得通知|
    |OP_WRITE|请求当可以向 channel中写入数据时候 获得通知、|

Epoll 基于linux的本地非阻塞传输

在linux 2.5.44内核引入的功能 比POSIX 的select poll 更加牛逼
不考虑 夸平台 只考虑linux 的话 epoll 直接走起

oio 阻塞io

基于jdk java.net包的阻塞io
是netty 为了兼容 可能存在的老旧的系统 保留的 oio支持

基于jvm内部的local传输

在同一个jvm中运行的客户端和服务端程序直接的异步通信使用这种 可以大幅度提高速度

总结:

java实现oio和nio的代码 提供的api 区别比较大
oio 不需要selector 直接server socket接受 处理
nio 通过selector 协调 分配给相应的 socket 处理
netty 实现oio和nio 提供的api 基本相同 就是在使用发送方式和发送渠道时候 需要保持一致
例如 使用NioEventLoopGroup 那么传输渠道必须是NioServerChannelSocket
使用OioEventLoopGroup那么必须使用OioServerChannelSocket

传输支持的协议

传输类型支持类型
NIOtcp、udp、sctp、udt
Epoll(linux)tcp、udp
OIOtcp、udp、sctp、udt

sctp 增强版本的tcp
udt 增强版本的udp

传输类型选择:

需求类型推荐传输类型备注
非阻塞代码、常规套路nio、或者epoll如果不考虑跨平台等功能 并且只考虑linux 平台运行 那么 epoll 是肯定的 毕竟是个特殊优化的非阻塞模式
阻塞代码oio处理遗留代码的时候选择
同jvm通信jvm local在同一个jvm中 进行通信的话 选择jvm local 可以直接省掉网络io的开销
测试ChannelHandlerEmbedded只有写单元测试用用

零拷贝:

   零拷贝(zero-copy)是一种目前只有在使用 NIO 和 Epoll 传输时才可使用的特性。它使你可以快速
   高效地将数据从文件系统移动到网络接口,而不需要将其从内核空间复制到用户空间,其在像 FTP 或者
   HTTP 这样的协议中可以显著地提升性能。但是,并不是所有的操作系统都支持这一特性。特别地,它对
   于实现了数据加密或者压缩的文件系统是不可用的——只能传输文件的原始内容。反过来说,传输已被
   加密的文件则不是问题。

© 2024 ming博客. All rights reserved.基于rust salvo性能猛的很!