初识Netty

Netty 是基于 Java NIO 的异步事件驱动的网络应用框架,使用 Netty 可以快速开发网络应用.Netty 是完全基于 NIO 实现的,所以整个 Netty 都是异步的。
Netty其实就是封装的NIO,用于简化NIO操作的。

Netty初体验

这里先简单看一下Netty,有很多不懂的地方会慢慢学习和介绍

  1. 导入依赖
    1
    2
    3
    4
    5
    dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.35.Final</version>
    </dependency>
  2. 服务端

    这是一个netty的服务端连接配置程序,第一步创建用于处理NIO类型的连接和读写请求的线程池,创建Netty启动对象,配置服务端和客户端的连接类型(NIO),编写具体的业务处理器。启动Netty。

  • EventLoopGroup是线程池,他是继承线程池的
  • serverBootstrap.bind(9090) 表示netty的启动,是异步的,加上sync()会进行阻塞
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class NettyServer {
public static void main(String[] args) {
//创建处理连接客户端请求的线程组
EventLoopGroup bossGroup = new NioEventLoopGroup(10);
//创建只处理客户端读写业务的线程组
EventLoopGroup workerGroup = new NioEventLoopGroup(10);
try {
//这个是netty的启动对象
ServerBootstrap serverBootstrap = new ServerBootstrap();
//对启动对象进行配置
serverBootstrap.group(bossGroup,workerGroup)//线程组的配置
//配置连接通道类型
.channel(NioServerSocketChannel.class)
//队列长度,当线程池满了,请求会暂存到这个队列中
.option(ChannelOption.SO_BACKLOG,1024)
//处理器,创建通道初始化对象,向该对象添加处理器,处理具体业务
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//获得pipeline
ChannelPipeline pipeline = ch.pipeline();

//添加业务处理handler,这里是自定义的 NettyServerHandler
pipeline.addLast(new NettyServerHandler());
}
});
System.out.println("Netty服务端启动了");
//同步阻塞地启动服务端
ChannelFuture channelFuture = serverBootstrap.bind(9090).sync();
//只要服务没关闭,该方法会一直阻塞
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//断开所有线程池中的连接
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
  1. 服务端自定义Handler
    这里是自定义的拦截器 需要去实现Netty框架中的 ChannelInboundHandlerAdapter
    • 这里用来处理服务端读写的操作,利用ChannelInboundHandlerAdapter提供的方法实现 读取数据 和写入数据操作
    • ByteBuf 是netty提供的抽象类,他是对bytebuff的封装,使我们更容易操作bytebuff
    • NIO 中使用的是 bytebuff,Netty封装成了bytebuf。
    pAoCPuF.png
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
/** channelRead方法是当客户端发送来数据时会自动调用
* ctx含有Channel 和pipeline 的上下文对象
* msg 客户端发来的数据
*/

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("从客户端读取到的数据:"+buf);
}



/**读取数据后调用的方法:发数据给客户端
*
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ByteBuf buf = Unpooled.copiedBuffer("hello world".getBytes(StandardCharsets.UTF_8));
//将数据写入通道中
ctx.writeAndFlush(buf);
}
// 当出现异常后会调用的方法
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
  1. 客户端
    客户端的代码逻辑和服务端整体相同,不同的地方:

    • 客户端的启动对象是 bootstrap 而不是 servicebootstrap
    • 客户端启动需要指定 服务端ip
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      public class NettyClient {
      public static void main(String[] args) {
      //创建一个线程组用于事件循环
      EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
      try {
      //创建客户端启动对象
      Bootstrap bootstrap = new Bootstrap();
      //设置相关参数
      bootstrap.group(eventLoopGroup)
      //使用NioSocketChannel作为客户端的通道实现
      .channel(NioSocketChannel.class)
      //创建通道初始化对象并设置handler业务处理器
      .handler(new ChannelInitializer<SocketChannel>() {
      @Override
      protected void initChannel(SocketChannel ch) throws Exception {
      //获得pipeline
      ChannelPipeline pipeline = ch.pipeline();
      //添加解码器handler
      //pipeline.addLast(new StringDecoder());
      //添加编码器handler
      //pipeline.addLast(new StringEncoder());
      // pipeline.addLast(new ObjectEncoder());
      //添加处理器,处理器里面是实现具体业务的
      pipeline.addLast(new NettyClientHandler());
      }
      });
      System.out.println("Netty客户端启动了");
      //同步阻塞地告知客户端连接的服务器的地址,并启动客户端
      ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9090).sync();
      //阻塞等待完成操作后关闭通道
      channelFuture.channel().closeFuture().sync();
      } catch (InterruptedException e) {
      e.printStackTrace();
      } finally {
      eventLoopGroup.shutdownGracefully();
      }
      }
      }
  2. 客户端自定义Handler
    客户端需要实现一个 channelActive方法,该方法会在客户端连接上服务端后自动调用

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    /**
    * 当客户端完成连接服务器后调用该方法:向服务端写数据
    */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
    //使用protostuff编码对象
    ByteBuf buf = Unpooled.copiedBuffer("hello service".getBytes(StandardCharsets.UTF_8));
    ctx.writeAndFlush(buf);
    }

    /**
    * 当通道有读事件发生时调用的方法:读取服务器返回的数据
    * @param ctx
    * @param msg
    */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    System.out.println("来自服务器"+ctx.channel().remoteAddress()+"的消息:"+msg);
    }

    /**
    * @param ctx
    * @param cause
    */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    ctx.close();
    }
    }

    最终效果
    pAoP7wQ.png
    pAoPqFs.png

NIO基础概念补充

Netty的底层实现都是由NIO来实现的,没有学过NIO,去学Netty还是有点吃力,来补充一下NIO的基础概念
首先看NIO的实现原理图:
客户端和服务端会先建立连接,然后通过各自的Channel注册到Selector(多路复用器中),当某一个客户端有读写请求时,Selector会在已经注册的Channel中根据SelectionKey查找具体时哪一个客户端的Channel发起的请求,并进行处理。

SelectionKey 是Channel在Selector中注册后,Selector为Channel生成的标识 类似于id


pAoijNd.jpg

epoll

当某一个客户端有请求时,Selector会查找该客户端的Channel,查找的方法有三种:

  • Selector.select()
  • Selector.poll()
  • Selector.epoll()
查找方法操作方式底层实现IO效率最大值
select遍历数组遍历数组中的所有Channel,效率差有上限
poll遍历链表遍历来链表中的Channel,效率差无上限
epoll回调哈希表操作系统回调就绪状态事件无上限

Netty基础知识

Netty线程模型

当有客户端向服务端连接时,BossGroup中的NioEventLoop中的Selector会调用select方法并为该客户端生成一个 processSelectedKeys ,并注册到Worker Group中,此时客户端和服务端会建立一个Channel(通道),标识为 processSelectedKeys,且注册到了Worker Group,当客户端有读写请求时,Worker Group会调用select,根据processSelectedKeys找出是哪个通道发起的读写请求,并交给Pipeline处理具体的读写业务逻辑
pAokCGR.png

Bootstrap

Bootstrap是Netty的启动程序,⼀个Netty应⽤通常由⼀个Bootstrap开始。Bootstrap的主要作⽤是配置Netty程序,串联Netty的各个组件。

  • Bootstrap: 客户端的启动程序
  • ServerBootstrap:服务端启动程序

例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//这个是netty的启动对象
ServerBootstrap serverBootstrap = new ServerBootstrap();
//对启动对象进行配置
serverBootstrap.group(bossGroup,workerGroup)//线程组的配置
//配置连接通道类型
.channel(NioServerSocketChannel.class)
//队列长度,当线程池满了,请求会暂存到这个队列中
.option(ChannelOption.SO_BACKLOG,1024)
//处理器,创建通道初始化对象,向该对象添加处理器,处理具体业务
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//获得pipeline
ChannelPipeline pipeline = ch.pipeline();

//添加业务处理handler,这里是自定义的 NettyServerHandler
pipeline.addLast(new NettyServerHandler());
}
});

Future和ChannelFuture

Netty的所有操作都是异步的,即不能⽴刻得知消息是否被正确处理。因此需要通过Future和ChannelFuture来注册监听器,当操作执⾏成果或者失败来调⽤具体的监听器。

  • Future通过sync⽅法来获得同步执⾏的效果。
  • ChannelFuture是Future的⼦类,提供了针对于Channel的异步监听操作
    ChannelFuture 提供了多种监听器的使用,例如在服务端代码进行改造
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
                //注释的代码是阻塞的,我们不用阻塞的方法,然后添加监听器,并体会异步
    // ChannelFuture channelFuture = serverBootstrap.bind(9090).sync();
    ChannelFuture channelFuture = serverBootstrap.bind(9090);
    channelFuture.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture channelFuture) throws Exception {
    if (channelFuture.isSuccess()){
    System.out.println("启动成功");
    }else {
    System.out.println("启动失败");
    }
    }
    });
    System.out.println("=========");
    //只要服务没关闭,该方法会一直阻塞
    channelFuture.channel().closeFuture().sync();
    结果如下, ====比启动成功先输出,说明了程序的异步的效果,并监听到了Netty启动成功
    pAokco4.png

    Channel

    Channel是Netty⽹络通信的重要组件,⽤于执⾏⽹络IO操作.

    Channel更像是用来描述 客户端和服务端连接的通道,通道中有很多的Handler,其中包括我们自定义的Handler 用于处理业务

    Channel具备以下能⼒:
  • 获得当前⽹络连接通道的状态
  • ⽹络连接的配置参数
  • 提供异步的⽹络IO操作,⽐如建⽴连接、绑定端⼝、读写操作等
  • 获得ChannelFuture实例,并注册监听器到ChannelFuture上,⽤于监听IO操作的成功、失败、取消时的事件回调。
    Channel具体的实现类有以下⼏种:
  • NioSocketChannel: 异步的客户端TCP Socket连接通道
  • NioServerSocketChannel:异步的服务端TCP Socket连接通道
  • NioDatagramChannel:异步的UDP连接通道
  • NioSctpChannel:异步的客户端Sctp连接通道
  • NioSctpServerChannel:异步的服务端Sctp连接通道

    ChannelHandler

    ChannelHandler⽤于处理拦截IO事件,往往在ChannelHandler中可以加⼊业务处理逻辑。ChannelHandler执⾏完后会将IO事件转发到ChannelPipeline中的下⼀个处理程序。

    信息提示 我们自定义的Handler其实就是 ChannelHandler的子类

    ChannelPipeline

    ChannelPipeline是⼀个双向链表,其中保存着多个ChannelHandler。ChannelPipeline实现了⼀种⾼级形式的过滤器模式,在IO操作时发⽣的⼊站和出站事件,会导致ChannelPipeline中的多个ChannelHandler被依次调⽤
    pAoNwsf.png

    Netty的编解码

    编解码是什么?例如 客户端向服务端发送了一句 “你好,服务端”,但这句话在传输过程中是以二进制形式进行传输的,所以在发送之前会进行编码,将这句话编码成二进制,服务端所收到的消息也是二进制的,所以需要解码成 “你好,服务端”。

    入站与出站

    ChannelHandler⽤于处理⼊站和出站数据。
  • ChannelHandler的实现类ChannelInboundHandlerAdapter表示⼊站程序
  • ChannelHandler的实现类ChannelOutboundHandlerAdapter表示出站程序。
  • ⼀个Channel包含⼀个ChannelPipeline,⽽ChannelPipeline维护着由多个ChannelHandlerContext组成的双向链表。且每个ChannelHandlerContext内包含⼀个ChannelHandler
    ⼊站:⽆论是客户端还是服务端,对应着ChannelPipeline中链表的tail尾部。如
    果是接收数据,则为⼊站事件。即数据会从head到tail的过程,会依次经历多个
    ChannelHandler。
    出站:如果发⽣的是发送数据的事件,则数据会从tail尾部发送到head头部,这
    个过程中会先后经历多个ChannelHandler

    编解码器

类型编码器解码器备注
StringStringEncoderStringDncoder
ObjectObjectEncoderObjectDncoder
服务端处理http请求HttpResponseEncoderHttpRequestDecoderHttpServerCodec整合了编解码器
客户端处理http请求HttpResponseEncoderHttpRequestDecoderHttpClientCodec整合了编解码器

String类型编解码器适用于字符串。
Object类型的编码器可以用于对象类型。但是对象类型必须实现序列化

我们来看ObjectEncoder的源码分析

  • 可以看到encode编码方法需要的是 Serializable类型的msg,所以所传对象需要实现Serializable。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    @Sharable
    public class ObjectEncoder extends MessageToByteEncoder<Serializable> {
    private static final byte[] LENGTH_PLACEHOLDER = new byte[4];

    public ObjectEncoder() {
    }

    protected void encode(ChannelHandlerContext ctx, Serializable msg, ByteBuf out) throws Exception {
    int startIdx = out.writerIndex();
    ByteBufOutputStream bout = new ByteBufOutputStream(out);
    ObjectOutputStream oout = null;

    try {
    bout.write(LENGTH_PLACEHOLDER);
    oout = new CompactObjectOutputStream(bout);
    oout.writeObject(msg);
    oout.flush();
    } finally {
    if (oout != null) {
    oout.close();
    } else {
    bout.close();
    }

    }

    int endIdx = out.writerIndex();
    out.setInt(startIdx, endIdx - startIdx - 4);
    }
    }
    如何配置编码器,很简单在Pipeline中添加即可
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
      serverBootstrap.group(bossGroup,workerGroup)
    .channel(NioServerSocketChannel.class)
    .childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
    //获得pipeline
    ChannelPipeline pipeline = ch.pipeline();
    //添加解码器handler
    // pipeline.addLast(new StringDecoder());
    pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
    //添加编码器handler
    pipeline.addLast(new ObjectEncoder());
    //添加业务处理handler
    pipeline.addLast(new NettyServerHandler());
    }
    });
    System.out.println("Netty服务端启动了");

    Protostuff提升编解码速度

  1. 引入依赖
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    <!--protostuff-->
    <dependency>
    <groupId>com.dyuproject.protostuff</groupId>
    <artifactId>protostuff-api</artifactId>
    <version>1.0.10</version>
    </dependency>
    <dependency>
    <groupId>com.dyuproject.protostuff</groupId>
    <artifactId>protostuff-core</artifactId>
    <version>1.0.10</version>
    </dependency>
    <dependency>
    <groupId>com.dyuproject.protostuff</groupId>
    <artifactId>protostuff-runtime</artifactId>
    <version>1.0.11</version>
    </dependency>
  2. 使用Protostuff工具提升编解码,就不再需要在Pipeline中添加编解码大的handler了,而是在对应的发送接收消息的方法中使用
    例如在客户端发消息的方法中:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    /**
    * 当客户端完成连接服务器后调用该方法:向服务端写数据
    */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
    //使用protostuff编码对象
    ByteBuf buf = Unpooled.copiedBuffer(ProtostuffUtil.serialize(new Student(1001L,"小明")));
    ctx.writeAndFlush(buf);
    }

    Netty 实现简单群聊

    服务端

    服务端用来接收客户端所发来的请求,当客户端连上服务端时,会通知其他客户端,有客户端上线了
  3. 第一步编写服务端配置代码 这一步配置代码几乎时固定不变的
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    public class ChatServer {
    public static void main(String[] args) {
    EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    try {
    ServerBootstrap serverBootstrap = new ServerBootstrap();
    //配置参数
    serverBootstrap.group(bossGroup,workerGroup)
    .channel(NioServerSocketChannel.class)
    .option(ChannelOption.SO_BACKLOG,1024)
    .childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
    //获得pipeline
    ChannelPipeline pipeline = ch.pipeline();
    //添加handler
    pipeline.addLast(new StringDecoder());
    pipeline.addLast(new StringEncoder());
    //添加业务处理handler
    pipeline.addLast(new ChatServerHandler());
    }
    });
    System.out.println("聊天室启动了...");
    ChannelFuture channelFuture = serverBootstrap.bind(9090).sync();
    channelFuture.channel().closeFuture().sync();
    } catch (InterruptedException e) {
    e.printStackTrace();
    } finally {
    bossGroup.shutdownGracefully();
    workerGroup.shutdownGracefully();
    }
    }
    }
  4. 第二步编写Handler

    这一步也是最重要的一步。这里的自定义的Handler是实现的 SimpleChannelInboundHandler 而不是之前提到的ChannelInboundHandlerAdapter。其实并没有很大区别,因为SimpleChannelInboundHandler是ChannelInboundHandlerAdapter的子类。
    SimpleChannelInboundHandler在ChannelInboundHandlerAdapter的基础上重写了 channelRead方法,会在channelRead结束后清空内存空间。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    public class ChatServerHandler extends SimpleChannelInboundHandler<String> {


    private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    //存放Channel的容器,而且可以执行对每个channel进行操作的任务,任务由GlobalEventExecutor提供的单线程来执行的。
    private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);


    /**
    * 有新的客户端(channel)连接了,将该客户端的上线信息广播给所有其他的客户端
    *
    * @param ctx
    */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
    //得到客户端的channel,
    Channel channel = ctx.channel();
    String message = "客户端-" + channel.remoteAddress() + "于" + sdf.format(new Date()) + "上线了\n";
    System.out.println(message);
    //得到其他客户端的channel,向其他客户端发送该客户端的上线信息
    channelGroup.writeAndFlush(message);
    //加入到channelGroup中
    channelGroup.add(channel);
    }

    /**
    * 客户端下线则广播给其他客户端
    *
    * @param ctx
    */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    Channel channel = ctx.channel();
    //生成一个下线的信息
    String message = "客户端-" + channel.remoteAddress() + "下线了\n";
    System.out.println(message);
    //广播给其他客户端
    channelGroup.writeAndFlush(message);

    }

    /**
    * 具体的读数据的业务
    */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
    //获得当前发消息的客户端的channel
    Channel channel = ctx.channel();
    //遍历所有的channel
    channelGroup.forEach(ch -> {
    if (channel != ch) {
    //发给其他客户端
    ch.writeAndFlush("客户端-" + channel.remoteAddress() + "于" + sdf.format(new Date()) + "说:"
    + msg + "\n"+"_");
    }else{
    //发给当前发消息的客户端(自己)
    ch.writeAndFlush("我于"+ sdf.format(new Date()) + "说:"
    + msg + "\n"+"_");
    }
    });
    }

    }

    客户端

    客户端需要实现的就是发消息和接收消息

    这里的发消息并不是写在Handler里的

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    public class ChatClient {
    public static void main(String[] args) {
    EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
    try {
    Bootstrap bootstrap = new Bootstrap();
    bootstrap.group(eventLoopGroup)
    .channel(NioSocketChannel.class)
    .handler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
    ChannelPipeline pipeline = ch.pipeline();
    pipeline.addLast(new StringDecoder());
    pipeline.addLast(new StringEncoder());
    pipeline.addLast(new ChatClientHandler());
    }
    });
    ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9090).sync();
    Channel channel = channelFuture.channel();
    //发送消息
    System.out.println("欢迎进入小怪兽聊天室");
    Scanner scanner = new Scanner(System.in);
    while(scanner.hasNextLine()){
    String message = scanner.nextLine();
    //发送消息到channel
    channel.writeAndFlush(message);
    }
    } catch (InterruptedException e) {
    e.printStackTrace();
    } finally {
    eventLoopGroup.shutdownGracefully();
    }

    }
    }
  5. 编写Handler
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    public class ChatClientHandler extends SimpleChannelInboundHandler<String> {

    /**
    * 打印消息在控制台
    * @param ctx
    * @param msg
    * @throws Exception
    */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
    System.out.println(msg.trim());
    }
    }

    最终结果

    pATkaC9.png

粘包和拆包

  • 粘包
    缓冲区数据量满了就会作为整体来发送,⽽这个整体中包含了多条业务数据,那这种情况就是粘包
  • 拆包
    在缓冲区数据量满的时候,把⼀条数据分成了两次缓冲区发送,这种情况就是拆包。

    简单来说 粘包就是多条数据被当成一条数据发送出去了,拆包就是一条数据被当成多条数据发送出去了

    例如我们让客户端2号一直发送小怪兽,会发现很多小怪兽会连在一块发送,这就是粘包
    pATGUeg.png

粘包和拆包的解决方法

其实解决方法也很简单,无非是传送数据时,不知道每条数据的开始和结束,我们让netty知道每条数据的开始和结束即可
Netty为粘包和拆包提供了多个解码器,每个解码器配有相应的分包解决⽅案

  1. LineBasedFrameDecoder:回⻋换⾏分包,以回⻋换⾏为分包的依据
    1
    pipeline.addLast(new LineBasedFrameDecoder(1024))
  2. DelimiterBasedFrameDecoder:特殊分隔符分包,以指定的特殊分隔符为分包依据,局限是消息内容中不能出现特殊分隔符
    1
    pipeline.addLast(new DelimiterBasedFrameDecoder(1024,Unpooled.copiedBuffer("_".getBytes())));
  3. FixedLengthFrameDecode:固定⻓度报⽂分包,消息⻓度被指定,不⾜的以空格补⾜。
    1
    pipeline.addLast(new FixedLengthFrameDecoder(1024))
  4. ⾃定义分包
    ⾃定义分包往往在发送每条数据的时候,将数据的⻓度⼀并发送。⽐如发送的数据的前4个字节⽤来表示消息的实际⻓度,之后根据消息的实际⻓度来获得具体的数据

    自定义分包拦截器

    在自定义分包中,其实就是将消息分为两部分,前一部分代表消息占多少个字节,后一部分表示真正的消息,这样在解码器中,我们读取到消息的长度,就可以准确的发送,避免了粘包和拆包
  • 消息:客户端发送的消息内容
  • 消息协议:这⼀次消息需要包含两个部分,即消息⻓度和消息内容本身。
  • ⾃定义消息编码器:消息编码器将客户端发送的消息转换成遵守消息协议的消息,即包含消息⻓度和消息内容的消息
  • ⾃定义消息解码器:消息解码器根据消息协议的消息⻓度,来获得指定⻓度的消息内容
    pA79kSx.png

    自定义消息协议

    这里定义了一个MessageProtocol实体类用来存储消息的长度和消息

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    public class MessageProtocol {
    //消息的长度
    private int length;
    //消息的内容
    private byte[] content;

    public int getLength() {
    return length;
    }

    public void setLength(int length) {
    this.length = length;
    }

    public byte[] getContent() {
    return content;
    }

    public void setContent(byte[] content) {
    this.content = content;
    }
    }

    再来改进一下客户端的配置代码

    这里自定义一个业务处理器和一个编码器。当然还可以添加自定义的解码器

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    public class NettyClient {

    public static void main(String[] args) {
    EventLoopGroup group = new NioEventLoopGroup(1);
    try {
    Bootstrap bootstrap = new Bootstrap();
    //设置相关的参数
    bootstrap.group(group)
    .channel(NioSocketChannel.class)
    .handler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
    ChannelPipeline pipeline = ch.pipeline();
    //添加处理器,分包编码器
    pipeline.addLast(new MessageEncoder());
    //添加具体的业务处理器
    pipeline.addLast(new NettyMessageClientHandler());
    }
    });
    System.out.println("客户端启动了");
    ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9090).sync();
    channelFuture.channel().closeFuture().sync();
    } catch (InterruptedException e) {
    e.printStackTrace();
    } finally {
    group.shutdownGracefully();
    }
    }


    }
  • 自定义的业务处理器

    在 channelActive 方法中实现的客户端连上服务端后具体的业务,这里实现一个简单的客户端连上服务端后 发送两百条“小怪兽”的消息,
    首先就需要将String字符串转成字节,(为什么转成字节,那当然是因为netty中消息在通道中是以byte的形式传送,所以前面才会讲到编解码器,这里转成字节,是为了获取到消息转换成字节的长度),然后将字节形式的消息封装到MessageProtocol中

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    public class NettyMessageClientHandler extends SimpleChannelInboundHandler<MessageProtocol> {

    //连接通道创建后要向服务端发送消息
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
    for (int i = 0; i < 200; i++) {
    String msg = "小怪兽";
    //创建消息协议对象
    MessageProtocol messageProtocol = new MessageProtocol();
    //封装长度和内容
    byte[] content = msg.getBytes(StandardCharsets.UTF_8);
    messageProtocol.setLength(content.length);
    messageProtocol.setContent(content);
    //发送消息协议对象,注意此时ctx只能发送Bytebuf数据,因此需要用编码器把它编码成Bytebuf数据
    ctx.writeAndFlush(messageProtocol);
    }
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {

    }
    }

    自定义编码器

    这样客户端的消息就发出去了,消息发出去后会经过编码器,这里需要继承MessageToByteEncoder类并指定泛型

    什么?你问我为什么要继承MessageToByteEncoder,因为他是专门为了自定义编码器而生的 !在这个类里面为我们封装好了encode 编码方法,他很只能的为我们提供了out.writeInt方法用来封装消息的长度, out.writeBytes封装消息。
    其实在之前提到的StringEncoder,ObjectEncoder 也都实现了 MessageToMessageEncoder

    1
    2
    3
    4
    5
    6
    7
    8
    public class MessageEncoder extends MessageToByteEncoder<MessageProtocol> {

    @Override
    protected void encode(ChannelHandlerContext ctx, MessageProtocol msg, ByteBuf out) throws Exception {
    out.writeInt(msg.getLength());
    out.writeBytes(msg.getContent());
    }
    }
    以上客户端的代码就简单完成了,再来看服务端

    服务端

    服务端的更改也只是添加了一个业务拦截器和自定义解码器
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    public class NettyServer {
    public static void main(String[] args) {
    EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    try {
    ServerBootstrap serverBootstrap = new ServerBootstrap();
    serverBootstrap.group(bossGroup,workerGroup)
    .channel(NioServerSocketChannel.class)
    .childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
    ChannelPipeline pipeline = ch.pipeline();
    //添加解码器
    pipeline.addLast(new MessageDecoder());
    pipeline.addLast(new NettyMessageServerHandler());
    }
    });
    System.out.println("Netty服务端启动了");
    ChannelFuture channelFuture = serverBootstrap.bind(9090).sync();
    channelFuture.channel().closeFuture().sync();
    } catch (InterruptedException e) {
    e.printStackTrace();
    } finally {
    bossGroup.shutdownGracefully();
    workerGroup.shutdownGracefully();
    }
    }
    }

    自定义解码器

    解码器就稍微复杂一些,客户端发来的消息经过编码器后最后会以ByteBuf的形式传到服务端的解码器,然后需要判断可读的字节数是否大于4,不大于4说明所传入的消息中并没有数据

    这时候就有人要问了,为什么要跟4判断,那当然是因为我们在 MessageProtocol实体类中创建用于存储消息长度的length是int类型的,int类型占4个字节

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    public class MessageDecoder extends ByteToMessageDecoder {

    int length = 0;


    /**
    *
    * @param ctx
    * @param in 客户端发送来的MessageProtocol编码后的ByteBuf数据
    * @param out out里的数据会被放行到下一个handler。把解码出来的MessageProtocol放到out里面
    * @throws Exception
    */
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
    System.out.println("ByteBuf:"+in);
    //获得前面4个字节的数据 == 描述实际内容的长度
    if(in.readableBytes()>=4){
    //ByteBuf里面可能有MessageProtocol数据
    if(length == 0){
    length = in.readInt();
    }//length = 15
    if(in.readableBytes()<length){
    //说明数据还没到齐,等待下一次调用decode
    System.out.println("当前数据量不够,继续等待");
    return;
    }
    //可读数据量>=length ==> 意味着这一次的MessageProtocol中的内容已经到齐了
    //创建一个指定length长度的字节数组
    byte[] content = new byte[length];
    //把ByteBuf里的指定长度的数据读到content数组中
    in.readBytes(content);
    //创建MessageProtocol对象并赋值
    MessageProtocol messageProtocol = new MessageProtocol();
    messageProtocol.setLength(length);
    messageProtocol.setContent(content);
    //传递给下一个handler
    out.add(messageProtocol);
    //重置length
    length = 0;
    }
    }
    }
  • 业务Handler
    1
    2
    3
    4
    5
    6
    7
    8
    9
    public class NettyMessageServerHandler extends SimpleChannelInboundHandler<MessageProtocol> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {
    System.out.println("---服务器收到的数据---");
    System.out.println("消息的长度:"+msg.getLength());
    System.out.println("消息的内容:"+new String(msg.getContent(), StandardCharsets.UTF_8));
    }
    }
    以上就是自定义编解码器 效果如下
    pA7CGCR.png

心跳

Netty在TCP的⻓连接中,客户端定期向服务端发送⼀种特殊的数据包,告知对⽅⾃⼰正常在线,以确保TCP连接的有效性。Netty实现⼼跳的关键需要通过IdleStateHandler。

心跳机制就是为了确保对方正常在线,避免出现客户端虽然连上了服务端,但是长时间不进行操作,会占用浪费资源,而心跳机制就是检测客户端在一定时间内有没有发来数据,如果没有则会断掉该客户端

实现心跳

接下来将会展示利用Netty提供的 IdleStateHandler实现心跳机制,这里主要针对服务端,而客户端的主要工作是向服务端发送消息

这里主要介绍 IdleStateHandler,IdleStateHandler需要四个参数,分别是:

  • 读空闲:在指定时间间隔内没有从Channel中读到数据,将会创建状态为READER_IDLE的IdleStateEvent对象。
  • 写空闲:在指定时间间隔内没有数据写⼊到Channel中,将会创建状态为WRITER_IDLE的IdleStateEvent对象。
  • 读写空闲:在指定时间间隔内Channel中没有发⽣读写操作,将会创建状态为ALL_IDLE的IdleStateEvent对象。
  • 时间单位:空闲时间单位

    如在接下来的示例中,我们对IdleStateHandler的参数分别为3,0,0,秒; 因为这里主要模仿的是客户端向服务端发送消息,服务端需要的是读空闲,并不涉及服务端向客户端发消息所以不要写空闲

  1. 服务端
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    public class NettyServer {

    public static void main(String[] args) {
    EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    try {
    ServerBootstrap serverBootstrap = new ServerBootstrap();
    serverBootstrap.group(bossGroup,workerGroup)
    .channel(NioServerSocketChannel.class)
    .childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
    ChannelPipeline pipeline = ch.pipeline();
    pipeline.addLast(new StringEncoder());
    pipeline.addLast(new StringDecoder());
    /*
    IdleStateHandler处理器会在服务端发现有超过3秒没有发生读操作的话会触发超时事件
    创建出IdleStateEvent对象,将该对象交给下一个Handler
    */
    pipeline.addLast(new IdleStateHandler(3,0,0, TimeUnit.SECONDS));
    //HeartbeatServerHandler必须重写userEventTriggered方法,用来做具体的超时的业务处理
    pipeline.addLast(new HeartbeatServerHandler());
    }
    });
    System.out.println("Netty服务端启动了");
    ChannelFuture channelFuture = serverBootstrap.bind(9090).sync();
    channelFuture.channel().closeFuture().sync();
    } catch (InterruptedException e) {
    e.printStackTrace();
    } finally {
    bossGroup.shutdownGracefully();
    workerGroup.shutdownGracefully();
    }
    }
    }
  2. 业务Handler

    在IdleStateHandler被成功执行后(你问我怎么才算成功执行,根据我们在服务端给定的参数,可知当客户端3秒内没有发来消息,就会被成功执行),接着会创建出IdleStateEvent对象,将该对象交给下一个Handler。且下一个Handler必须重写userEventTriggered方法。这里在userEventTriggered方法中的业务逻辑为获取IdleStateEvent对象,判断他是什么类型的超时,当连续超时次数达到3次时就会断开连接。
    当客户端在3秒内发来数据时,IdleStateHandler就不会被执行,会执行自定义Handler中的channelRead0方法,将超市次数置0。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    public class HeartbeatServerHandler extends SimpleChannelInboundHandler<String> {
    //读操作的超时次数
    int readIdleTimes = 0;
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
    readIdleTimes = 0;
    System.out.println("服务端收到的心跳:"+msg);
    ctx.writeAndFlush("服务端已经收到心跳");
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    IdleStateEvent event = (IdleStateEvent) evt;
    switch (event.state()){
    case READER_IDLE:
    readIdleTimes++;
    System.out.println("读超时");
    break;
    case WRITER_IDLE:
    System.out.println("写超时");
    break;
    case ALL_IDLE:
    System.out.println("读写超时");
    break;

    }
    //如果超时次数>3 则关闭客户端的连接
    if(readIdleTimes>3){
    System.out.println("读超时超过3次,关闭连接");
    ctx.writeAndFlush("超时关闭");
    ctx.channel().close();
    }
    }
    }

    断线重连

    断线重连发生在客户端中,分为两种情况,一是第一次连接失败,二是断线了.
  • 这里是将连接服务端代码抽取成了一个方法,当第一次连接失败后会采用递归的方式进行重连
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    public class NettyClient {

    private Bootstrap bootstrap;

    public Bootstrap getBootstrap() {
    return bootstrap;
    }

    public void setBootstrap(Bootstrap bootstrap) {
    this.bootstrap = bootstrap;
    }

    public static void main(String[] args) {
    EventLoopGroup group = new NioEventLoopGroup();
    Bootstrap bootstrap = new Bootstrap();
    NettyClient nettyClient = new NettyClient();
    nettyClient.setBootstrap(bootstrap);
    bootstrap.group(group)
    .channel(NioSocketChannel.class)
    .handler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
    //添加业务处理器,解决连接成功后的服务器断开连接的重连动作
    ChannelPipeline pipeline = ch.pipeline();
    pipeline.addLast(new NettyReconnectClientHandler(nettyClient));
    }
    });
    //第一次的连接
    nettyClient.connect();
    //阻塞
    try {
    System.in.read();
    } catch (IOException e) {
    e.printStackTrace();
    }

    }

    public void connect(){
    //连接服务器
    ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9090);
    //异步的知道连接的结果
    channelFuture.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
    boolean result = future.isSuccess();
    if(!result){
    //连接失败==》在一定的时间间隔以后重新连接
    future.channel().eventLoop().schedule(()->{
    System.out.println("重新连接中...");
    connect();
    },2000, TimeUnit.MILLISECONDS);
    }else{
    //连接成功
    System.out.println("连接服务器成功");
    }
    }
    });
    }
    }
  • 自定义Handler实现断线重连
    重写 channelInactive方法(该方法会在断开连接后调用),在该方法中调用封装的
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    public class NettyReconnectClientHandler extends SimpleChannelInboundHandler<String> {

    private NettyClient nettyClient;

    public NettyReconnectClientHandler(NettyClient nettyClient) {
    this.nettyClient = nettyClient;
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
    System.out.println("收到服务端的数据:"+msg);
    }

    /**
    * 断线后会调用此方法,重新连接
    * @param ctx
    * @throws Exception
    */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    System.out.println("服务器断线重连中...");
    nettyClient.connect();
    }
    }

    暂时完结!

    Netty暂时了解这么多,后续有需要会接着学习零拷贝和源码。

完结个蛋完结,一堆没学服了。

本来是为了仿微信的项目的而去学的netty,发现学的netty知识不够,回来继续补

知识补充

netty的http

  1. 由于我在做仿微信项目,所以请求其实是从前端通过http协议发送过来的,所以首先需要学习netty为我们提供的httpde编解码器
类型编码器解码器备注
服务端处理http请求HttpResponseEncoderHttpRequestDecoderHttpServerCodec整合了编解码器
客户端处理http请求HttpResponseEncoderHttpRequestDecoderHttpClientCodec整合了编解码器

http的编解码也一样,在通道中直接添加即可,编解码器介绍完了,当然http请求也会发生类似粘包拆包的情况,所以下一个问题就是解决http请求发来的消息不完整的问题

  1. HTTP消息聚合
  • 处理HTTP时可能接收HTTP消息片段,Netty需要缓冲直到接收完整个消息。要完成的处理HTTP消息,并且内存开销也不会很大,Netty为此提供了HttpObjectAggregator。
  • 通过HttpObjectAggregator,Netty可以聚合HTTP消息,使用FullHttpResponse和FullHttpRequest到ChannelPipeline中的下一个ChannelHandler,这就消除了断裂消息,保证了消息的完整。
    [你问我什么是FullHttpResponse,FullHttpRequest]

    http协议中包含发送请求和响应请求为一次完整的过程,当添加HttpObjectAggregator的时候,netty会自动将请求解析为两部分,一部分是请求头,一部分为请求体。利用这一个特点,可以实现校验,让前端发送的请求需要携带token,这样netty就可以获得token,并校验

WebSocket

刚才说到了http协议的请求经过一次发送和响应就结束,如果需要实现长连接聊天功能就需要让客户端不断地发http请求,这个方法明显太拉跨,所以webSocket出现了, WebSocket允许数据双向传输,而不需要请求-响应模式。
如何使用WebSocket 这就要讲到netty为我们提供的 WebSocketServerProtocolHandler

WebSocketServerProtocolHandler

WebSocketServerProtocolHandler 是 Netty 提供的一个 ChannelHandler,用于处理WebSocket协议。它负责升级HTTP请求到WebSocket连接,并且管理WebSocket连接的生命周期。这意味着它能够自动处理握手过程,以及后续的数据帧的编解码工作,使得开发者可以专注于业务逻辑的实现

  • 简单使用
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    // 添加 WebSocketServerProtocolHandler
    pipeline.addLast(new WebSocketServerProtocolHandler(
    "/ws", // WebSocket 的路径
    null, // Subprotocols (null 表示没有子协议)
    true, // Allow extensions (true 表示允许扩展)
    65536 * 10 // Max frame payload length
    ));

    // 添加自己的 ChannelHandler 来处理 WebSocket 消息
    pipeline.addLast(new TextWebSocketFrameHandler());

Attribute

Attribute 他的底层应该是一个Map,Attribute使用来给channel绑定参数的,一个channel可以绑定多个参数。
例如: 在一个Handler通过Attribute给channel设置了多个参数,传递给下一个Handler。

1
2
3
4
5
6
7
8
9
10
11
12
13
// 获取当前channel的id
String channelId = channel.id().toString();
//创建 AttributeKey
AttributeKey attributeKey = null;
if (!AttributeKey.exists(channelId)) {
// 如果AttributeKey中不存在 当前channel的id 则创建一个
attributeKey = AttributeKey.newInstance(channel.id().toString());
} else {
//如果存在,则获取这个AttributeKey
attributeKey = AttributeKey.valueOf(channel.id().toString());
}
//查找名为 channel的id 的AttributeKey,给他设置值
channel.attr(attributeKey).set(userId);