NettY
初识Netty
Netty 是基于 Java NIO 的异步事件驱动的网络应用框架,使用 Netty 可以快速开发网络应用.Netty 是完全基于 NIO 实现的,所以整个 Netty 都是异步的。
Netty其实就是封装的NIO,用于简化NIO操作的。
Netty初体验
这里先简单看一下Netty,有很多不懂的地方会慢慢学习和介绍
- 导入依赖
1
2
3
4
5dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.35.Final</version>
</dependency> - 服务端
这是一个netty的服务端连接配置程序,第一步创建用于处理NIO类型的连接和读写请求的线程池,创建Netty启动对象,配置服务端和客户端的连接类型(NIO),编写具体的业务处理器。启动Netty。
- EventLoopGroup是线程池,他是继承线程池的
- serverBootstrap.bind(9090) 表示netty的启动,是异步的,加上sync()会进行阻塞
1 | |
- 服务端自定义Handler
这里是自定义的拦截器 需要去实现Netty框架中的 ChannelInboundHandlerAdapter- 这里用来处理服务端读写的操作,利用ChannelInboundHandlerAdapter提供的方法实现 读取数据 和写入数据操作
- ByteBuf 是netty提供的抽象类,他是对bytebuff的封装,使我们更容易操作bytebuff
- NIO 中使用的是 bytebuff,Netty封装成了bytebuf。

1 | |
客户端
客户端的代码逻辑和服务端整体相同,不同的地方:- 客户端的启动对象是 bootstrap 而不是 servicebootstrap
- 客户端启动需要指定 服务端ip
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38public class NettyClient {
public static void main(String[] args) {
//创建一个线程组用于事件循环
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
//创建客户端启动对象
Bootstrap bootstrap = new Bootstrap();
//设置相关参数
bootstrap.group(eventLoopGroup)
//使用NioSocketChannel作为客户端的通道实现
.channel(NioSocketChannel.class)
//创建通道初始化对象并设置handler业务处理器
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//获得pipeline
ChannelPipeline pipeline = ch.pipeline();
//添加解码器handler
//pipeline.addLast(new StringDecoder());
//添加编码器handler
//pipeline.addLast(new StringEncoder());
// pipeline.addLast(new ObjectEncoder());
//添加处理器,处理器里面是实现具体业务的
pipeline.addLast(new NettyClientHandler());
}
});
System.out.println("Netty客户端启动了");
//同步阻塞地告知客户端连接的服务器的地址,并启动客户端
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9090).sync();
//阻塞等待完成操作后关闭通道
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
eventLoopGroup.shutdownGracefully();
}
}
}
客户端自定义Handler
客户端需要实现一个 channelActive方法,该方法会在客户端连接上服务端后自动调用1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31public class NettyClientHandler extends ChannelInboundHandlerAdapter {
/**
* 当客户端完成连接服务器后调用该方法:向服务端写数据
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//使用protostuff编码对象
ByteBuf buf = Unpooled.copiedBuffer("hello service".getBytes(StandardCharsets.UTF_8));
ctx.writeAndFlush(buf);
}
/**
* 当通道有读事件发生时调用的方法:读取服务器返回的数据
* @param ctx
* @param msg
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("来自服务器"+ctx.channel().remoteAddress()+"的消息:"+msg);
}
/**
* @param ctx
* @param cause
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
NIO基础概念补充
Netty的底层实现都是由NIO来实现的,没有学过NIO,去学Netty还是有点吃力,来补充一下NIO的基础概念
首先看NIO的实现原理图:
客户端和服务端会先建立连接,然后通过各自的Channel注册到Selector(多路复用器中),当某一个客户端有读写请求时,Selector会在已经注册的Channel中根据SelectionKey查找具体时哪一个客户端的Channel发起的请求,并进行处理。
SelectionKey 是Channel在Selector中注册后,Selector为Channel生成的标识 类似于id

epoll
当某一个客户端有请求时,Selector会查找该客户端的Channel,查找的方法有三种:
- Selector.select()
- Selector.poll()
- Selector.epoll()
| 查找方法 | 操作方式 | 底层实现 | IO效率 | 最大值 |
|---|---|---|---|---|
| select | 遍历 | 数组 | 遍历数组中的所有Channel,效率差 | 有上限 |
| poll | 遍历 | 链表 | 遍历来链表中的Channel,效率差 | 无上限 |
| epoll | 回调 | 哈希表 | 操作系统回调就绪状态事件 | 无上限 |
Netty基础知识
Netty线程模型
当有客户端向服务端连接时,BossGroup中的NioEventLoop中的Selector会调用select方法并为该客户端生成一个 processSelectedKeys ,并注册到Worker Group中,此时客户端和服务端会建立一个Channel(通道),标识为 processSelectedKeys,且注册到了Worker Group,当客户端有读写请求时,Worker Group会调用select,根据processSelectedKeys找出是哪个通道发起的读写请求,并交给Pipeline处理具体的读写业务逻辑
Bootstrap
Bootstrap是Netty的启动程序,⼀个Netty应⽤通常由⼀个Bootstrap开始。Bootstrap的主要作⽤是配置Netty程序,串联Netty的各个组件。
- Bootstrap: 客户端的启动程序
- ServerBootstrap:服务端启动程序
例如:
1 | |
Future和ChannelFuture
Netty的所有操作都是异步的,即不能⽴刻得知消息是否被正确处理。因此需要通过Future和ChannelFuture来注册监听器,当操作执⾏成果或者失败来调⽤具体的监听器。
- Future通过sync⽅法来获得同步执⾏的效果。
- ChannelFuture是Future的⼦类,提供了针对于Channel的异步监听操作
ChannelFuture 提供了多种监听器的使用,例如在服务端代码进行改造结果如下, ====比启动成功先输出,说明了程序的异步的效果,并监听到了Netty启动成功1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16//注释的代码是阻塞的,我们不用阻塞的方法,然后添加监听器,并体会异步
// ChannelFuture channelFuture = serverBootstrap.bind(9090).sync();
ChannelFuture channelFuture = serverBootstrap.bind(9090);
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (channelFuture.isSuccess()){
System.out.println("启动成功");
}else {
System.out.println("启动失败");
}
}
});
System.out.println("=========");
//只要服务没关闭,该方法会一直阻塞
channelFuture.channel().closeFuture().sync();
Channel
Channel是Netty⽹络通信的重要组件,⽤于执⾏⽹络IO操作.Channel具备以下能⼒:Channel更像是用来描述 客户端和服务端连接的通道,通道中有很多的Handler,其中包括我们自定义的Handler 用于处理业务
- 获得当前⽹络连接通道的状态
- ⽹络连接的配置参数
- 提供异步的⽹络IO操作,⽐如建⽴连接、绑定端⼝、读写操作等
- 获得ChannelFuture实例,并注册监听器到ChannelFuture上,⽤于监听IO操作的成功、失败、取消时的事件回调。
Channel具体的实现类有以下⼏种: - NioSocketChannel: 异步的客户端TCP Socket连接通道
- NioServerSocketChannel:异步的服务端TCP Socket连接通道
- NioDatagramChannel:异步的UDP连接通道
- NioSctpChannel:异步的客户端Sctp连接通道
- NioSctpServerChannel:异步的服务端Sctp连接通道
ChannelHandler
ChannelHandler⽤于处理拦截IO事件,往往在ChannelHandler中可以加⼊业务处理逻辑。ChannelHandler执⾏完后会将IO事件转发到ChannelPipeline中的下⼀个处理程序。信息提示 我们自定义的Handler其实就是 ChannelHandler的子类
ChannelPipeline
ChannelPipeline是⼀个双向链表,其中保存着多个ChannelHandler。ChannelPipeline实现了⼀种⾼级形式的过滤器模式,在IO操作时发⽣的⼊站和出站事件,会导致ChannelPipeline中的多个ChannelHandler被依次调⽤
Netty的编解码
编解码是什么?例如 客户端向服务端发送了一句 “你好,服务端”,但这句话在传输过程中是以二进制形式进行传输的,所以在发送之前会进行编码,将这句话编码成二进制,服务端所收到的消息也是二进制的,所以需要解码成 “你好,服务端”。入站与出站
ChannelHandler⽤于处理⼊站和出站数据。 - ChannelHandler的实现类ChannelInboundHandlerAdapter表示⼊站程序
- ChannelHandler的实现类ChannelOutboundHandlerAdapter表示出站程序。
- ⼀个Channel包含⼀个ChannelPipeline,⽽ChannelPipeline维护着由多个ChannelHandlerContext组成的双向链表。且每个ChannelHandlerContext内包含⼀个ChannelHandler
⼊站:⽆论是客户端还是服务端,对应着ChannelPipeline中链表的tail尾部。如
果是接收数据,则为⼊站事件。即数据会从head到tail的过程,会依次经历多个
ChannelHandler。
出站:如果发⽣的是发送数据的事件,则数据会从tail尾部发送到head头部,这
个过程中会先后经历多个ChannelHandler编解码器
| 类型 | 编码器 | 解码器 | 备注 |
|---|---|---|---|
| String | StringEncoder | StringDncoder | |
| Object | ObjectEncoder | ObjectDncoder | |
| 服务端处理http请求 | HttpResponseEncoder | HttpRequestDecoder | HttpServerCodec整合了编解码器 |
| 客户端处理http请求 | HttpResponseEncoder | HttpRequestDecoder | HttpClientCodec整合了编解码器 |
String类型编解码器适用于字符串。
Object类型的编码器可以用于对象类型。但是对象类型必须实现序列化
我们来看ObjectEncoder的源码分析
- 可以看到encode编码方法需要的是 Serializable类型的msg,所以所传对象需要实现Serializable。如何配置编码器,很简单在Pipeline中添加即可
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30@Sharable
public class ObjectEncoder extends MessageToByteEncoder<Serializable> {
private static final byte[] LENGTH_PLACEHOLDER = new byte[4];
public ObjectEncoder() {
}
protected void encode(ChannelHandlerContext ctx, Serializable msg, ByteBuf out) throws Exception {
int startIdx = out.writerIndex();
ByteBufOutputStream bout = new ByteBufOutputStream(out);
ObjectOutputStream oout = null;
try {
bout.write(LENGTH_PLACEHOLDER);
oout = new CompactObjectOutputStream(bout);
oout.writeObject(msg);
oout.flush();
} finally {
if (oout != null) {
oout.close();
} else {
bout.close();
}
}
int endIdx = out.writerIndex();
out.setInt(startIdx, endIdx - startIdx - 4);
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17serverBootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//获得pipeline
ChannelPipeline pipeline = ch.pipeline();
//添加解码器handler
// pipeline.addLast(new StringDecoder());
pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
//添加编码器handler
pipeline.addLast(new ObjectEncoder());
//添加业务处理handler
pipeline.addLast(new NettyServerHandler());
}
});
System.out.println("Netty服务端启动了");Protostuff提升编解码速度
- 引入依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16<!--protostuff-->
<dependency>
<groupId>com.dyuproject.protostuff</groupId>
<artifactId>protostuff-api</artifactId>
<version>1.0.10</version>
</dependency>
<dependency>
<groupId>com.dyuproject.protostuff</groupId>
<artifactId>protostuff-core</artifactId>
<version>1.0.10</version>
</dependency>
<dependency>
<groupId>com.dyuproject.protostuff</groupId>
<artifactId>protostuff-runtime</artifactId>
<version>1.0.11</version>
</dependency> - 使用Protostuff工具提升编解码,就不再需要在Pipeline中添加编解码大的handler了,而是在对应的发送接收消息的方法中使用
例如在客户端发消息的方法中:1
2
3
4
5
6
7
8
9/**
* 当客户端完成连接服务器后调用该方法:向服务端写数据
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//使用protostuff编码对象
ByteBuf buf = Unpooled.copiedBuffer(ProtostuffUtil.serialize(new Student(1001L,"小明")));
ctx.writeAndFlush(buf);
}Netty 实现简单群聊
服务端
服务端用来接收客户端所发来的请求,当客户端连上服务端时,会通知其他客户端,有客户端上线了 - 第一步编写服务端配置代码 这一步配置代码几乎时固定不变的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33public class ChatServer {
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
//配置参数
serverBootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//获得pipeline
ChannelPipeline pipeline = ch.pipeline();
//添加handler
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
//添加业务处理handler
pipeline.addLast(new ChatServerHandler());
}
});
System.out.println("聊天室启动了...");
ChannelFuture channelFuture = serverBootstrap.bind(9090).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
} - 第二步编写Handler
这一步也是最重要的一步。这里的自定义的Handler是实现的 SimpleChannelInboundHandler 而不是之前提到的ChannelInboundHandlerAdapter。其实并没有很大区别,因为SimpleChannelInboundHandler是ChannelInboundHandlerAdapter的子类。
SimpleChannelInboundHandler在ChannelInboundHandlerAdapter的基础上重写了 channelRead方法,会在channelRead结束后清空内存空间。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63public class ChatServerHandler extends SimpleChannelInboundHandler<String> {
private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//存放Channel的容器,而且可以执行对每个channel进行操作的任务,任务由GlobalEventExecutor提供的单线程来执行的。
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
/**
* 有新的客户端(channel)连接了,将该客户端的上线信息广播给所有其他的客户端
*
* @param ctx
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//得到客户端的channel,
Channel channel = ctx.channel();
String message = "客户端-" + channel.remoteAddress() + "于" + sdf.format(new Date()) + "上线了\n";
System.out.println(message);
//得到其他客户端的channel,向其他客户端发送该客户端的上线信息
channelGroup.writeAndFlush(message);
//加入到channelGroup中
channelGroup.add(channel);
}
/**
* 客户端下线则广播给其他客户端
*
* @param ctx
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
//生成一个下线的信息
String message = "客户端-" + channel.remoteAddress() + "下线了\n";
System.out.println(message);
//广播给其他客户端
channelGroup.writeAndFlush(message);
}
/**
* 具体的读数据的业务
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
//获得当前发消息的客户端的channel
Channel channel = ctx.channel();
//遍历所有的channel
channelGroup.forEach(ch -> {
if (channel != ch) {
//发给其他客户端
ch.writeAndFlush("客户端-" + channel.remoteAddress() + "于" + sdf.format(new Date()) + "说:"
+ msg + "\n"+"_");
}else{
//发给当前发消息的客户端(自己)
ch.writeAndFlush("我于"+ sdf.format(new Date()) + "说:"
+ msg + "\n"+"_");
}
});
}
}客户端
客户端需要实现的就是发消息和接收消息这里的发消息并不是写在Handler里的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34public class ChatClient {
public static void main(String[] args) {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(new ChatClientHandler());
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9090).sync();
Channel channel = channelFuture.channel();
//发送消息
System.out.println("欢迎进入小怪兽聊天室");
Scanner scanner = new Scanner(System.in);
while(scanner.hasNextLine()){
String message = scanner.nextLine();
//发送消息到channel
channel.writeAndFlush(message);
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
eventLoopGroup.shutdownGracefully();
}
}
} - 编写Handler
1
2
3
4
5
6
7
8
9
10
11
12
13public class ChatClientHandler extends SimpleChannelInboundHandler<String> {
/**
* 打印消息在控制台
* @param ctx
* @param msg
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(msg.trim());
}
}最终结果

粘包和拆包
- 粘包
缓冲区数据量满了就会作为整体来发送,⽽这个整体中包含了多条业务数据,那这种情况就是粘包 - 拆包
在缓冲区数据量满的时候,把⼀条数据分成了两次缓冲区发送,这种情况就是拆包。例如我们让客户端2号一直发送小怪兽,会发现很多小怪兽会连在一块发送,这就是粘包简单来说 粘包就是多条数据被当成一条数据发送出去了,拆包就是一条数据被当成多条数据发送出去了

粘包和拆包的解决方法
其实解决方法也很简单,无非是传送数据时,不知道每条数据的开始和结束,我们让netty知道每条数据的开始和结束即可
Netty为粘包和拆包提供了多个解码器,每个解码器配有相应的分包解决⽅案
- LineBasedFrameDecoder:回⻋换⾏分包,以回⻋换⾏为分包的依据
1
pipeline.addLast(new LineBasedFrameDecoder(1024)) - DelimiterBasedFrameDecoder:特殊分隔符分包,以指定的特殊分隔符为分包依据,局限是消息内容中不能出现特殊分隔符
1
pipeline.addLast(new DelimiterBasedFrameDecoder(1024,Unpooled.copiedBuffer("_".getBytes()))); - FixedLengthFrameDecode:固定⻓度报⽂分包,消息⻓度被指定,不⾜的以空格补⾜。
1
pipeline.addLast(new FixedLengthFrameDecoder(1024)) - ⾃定义分包
⾃定义分包往往在发送每条数据的时候,将数据的⻓度⼀并发送。⽐如发送的数据的前4个字节⽤来表示消息的实际⻓度,之后根据消息的实际⻓度来获得具体的数据自定义分包拦截器
在自定义分包中,其实就是将消息分为两部分,前一部分代表消息占多少个字节,后一部分表示真正的消息,这样在解码器中,我们读取到消息的长度,就可以准确的发送,避免了粘包和拆包
- 消息:客户端发送的消息内容
- 消息协议:这⼀次消息需要包含两个部分,即消息⻓度和消息内容本身。
- ⾃定义消息编码器:消息编码器将客户端发送的消息转换成遵守消息协议的消息,即包含消息⻓度和消息内容的消息
- ⾃定义消息解码器:消息解码器根据消息协议的消息⻓度,来获得指定⻓度的消息内容

自定义消息协议
这里定义了一个MessageProtocol实体类用来存储消息的长度和消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22public class MessageProtocol {
//消息的长度
private int length;
//消息的内容
private byte[] content;
public int getLength() {
return length;
}
public void setLength(int length) {
this.length = length;
}
public byte[] getContent() {
return content;
}
public void setContent(byte[] content) {
this.content = content;
}
}再来改进一下客户端的配置代码
这里自定义一个业务处理器和一个编码器。当然还可以添加自定义的解码器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31public class NettyClient {
public static void main(String[] args) {
EventLoopGroup group = new NioEventLoopGroup(1);
try {
Bootstrap bootstrap = new Bootstrap();
//设置相关的参数
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//添加处理器,分包编码器
pipeline.addLast(new MessageEncoder());
//添加具体的业务处理器
pipeline.addLast(new NettyMessageClientHandler());
}
});
System.out.println("客户端启动了");
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9090).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
} - 自定义的业务处理器
在 channelActive 方法中实现的客户端连上服务端后具体的业务,这里实现一个简单的客户端连上服务端后 发送两百条“小怪兽”的消息,
首先就需要将String字符串转成字节,(为什么转成字节,那当然是因为netty中消息在通道中是以byte的形式传送,所以前面才会讲到编解码器,这里转成字节,是为了获取到消息转换成字节的长度),然后将字节形式的消息封装到MessageProtocol中1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23public class NettyMessageClientHandler extends SimpleChannelInboundHandler<MessageProtocol> {
//连接通道创建后要向服务端发送消息
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for (int i = 0; i < 200; i++) {
String msg = "小怪兽";
//创建消息协议对象
MessageProtocol messageProtocol = new MessageProtocol();
//封装长度和内容
byte[] content = msg.getBytes(StandardCharsets.UTF_8);
messageProtocol.setLength(content.length);
messageProtocol.setContent(content);
//发送消息协议对象,注意此时ctx只能发送Bytebuf数据,因此需要用编码器把它编码成Bytebuf数据
ctx.writeAndFlush(messageProtocol);
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {
}
}自定义编码器
这样客户端的消息就发出去了,消息发出去后会经过编码器,这里需要继承MessageToByteEncoder类并指定泛型什么?你问我为什么要继承MessageToByteEncoder,因为他是专门为了自定义编码器而生的 !在这个类里面为我们封装好了encode 编码方法,他很只能的为我们提供了out.writeInt方法用来封装消息的长度, out.writeBytes封装消息。
其实在之前提到的StringEncoder,ObjectEncoder 也都实现了 MessageToMessageEncoder以上客户端的代码就简单完成了,再来看服务端1
2
3
4
5
6
7
8public class MessageEncoder extends MessageToByteEncoder<MessageProtocol> {
@Override
protected void encode(ChannelHandlerContext ctx, MessageProtocol msg, ByteBuf out) throws Exception {
out.writeInt(msg.getLength());
out.writeBytes(msg.getContent());
}
}服务端
服务端的更改也只是添加了一个业务拦截器和自定义解码器1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28public class NettyServer {
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//添加解码器
pipeline.addLast(new MessageDecoder());
pipeline.addLast(new NettyMessageServerHandler());
}
});
System.out.println("Netty服务端启动了");
ChannelFuture channelFuture = serverBootstrap.bind(9090).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}自定义解码器
解码器就稍微复杂一些,客户端发来的消息经过编码器后最后会以ByteBuf的形式传到服务端的解码器,然后需要判断可读的字节数是否大于4,不大于4说明所传入的消息中并没有数据这时候就有人要问了,为什么要跟4判断,那当然是因为我们在 MessageProtocol实体类中创建用于存储消息长度的length是int类型的,int类型占4个字节
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42public class MessageDecoder extends ByteToMessageDecoder {
int length = 0;
/**
*
* @param ctx
* @param in 客户端发送来的MessageProtocol编码后的ByteBuf数据
* @param out out里的数据会被放行到下一个handler。把解码出来的MessageProtocol放到out里面
* @throws Exception
*/
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
System.out.println("ByteBuf:"+in);
//获得前面4个字节的数据 == 描述实际内容的长度
if(in.readableBytes()>=4){
//ByteBuf里面可能有MessageProtocol数据
if(length == 0){
length = in.readInt();
}//length = 15
if(in.readableBytes()<length){
//说明数据还没到齐,等待下一次调用decode
System.out.println("当前数据量不够,继续等待");
return;
}
//可读数据量>=length ==> 意味着这一次的MessageProtocol中的内容已经到齐了
//创建一个指定length长度的字节数组
byte[] content = new byte[length];
//把ByteBuf里的指定长度的数据读到content数组中
in.readBytes(content);
//创建MessageProtocol对象并赋值
MessageProtocol messageProtocol = new MessageProtocol();
messageProtocol.setLength(length);
messageProtocol.setContent(content);
//传递给下一个handler
out.add(messageProtocol);
//重置length
length = 0;
}
}
} - 业务Handler以上就是自定义编解码器 效果如下
1
2
3
4
5
6
7
8
9public class NettyMessageServerHandler extends SimpleChannelInboundHandler<MessageProtocol> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {
System.out.println("---服务器收到的数据---");
System.out.println("消息的长度:"+msg.getLength());
System.out.println("消息的内容:"+new String(msg.getContent(), StandardCharsets.UTF_8));
}
}
心跳
Netty在TCP的⻓连接中,客户端定期向服务端发送⼀种特殊的数据包,告知对⽅⾃⼰正常在线,以确保TCP连接的有效性。Netty实现⼼跳的关键需要通过IdleStateHandler。
心跳机制就是为了确保对方正常在线,避免出现客户端虽然连上了服务端,但是长时间不进行操作,会占用浪费资源,而心跳机制就是检测客户端在一定时间内有没有发来数据,如果没有则会断掉该客户端
实现心跳
接下来将会展示利用Netty提供的 IdleStateHandler实现心跳机制,这里主要针对服务端,而客户端的主要工作是向服务端发送消息
这里主要介绍 IdleStateHandler,IdleStateHandler需要四个参数,分别是:
- 读空闲:在指定时间间隔内没有从Channel中读到数据,将会创建状态为READER_IDLE的IdleStateEvent对象。
- 写空闲:在指定时间间隔内没有数据写⼊到Channel中,将会创建状态为WRITER_IDLE的IdleStateEvent对象。
- 读写空闲:在指定时间间隔内Channel中没有发⽣读写操作,将会创建状态为ALL_IDLE的IdleStateEvent对象。
- 时间单位:空闲时间单位
如在接下来的示例中,我们对IdleStateHandler的参数分别为3,0,0,秒; 因为这里主要模仿的是客户端向服务端发送消息,服务端需要的是读空闲,并不涉及服务端向客户端发消息所以不要写空闲
- 服务端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35public class NettyServer {
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringEncoder());
pipeline.addLast(new StringDecoder());
/*
IdleStateHandler处理器会在服务端发现有超过3秒没有发生读操作的话会触发超时事件
创建出IdleStateEvent对象,将该对象交给下一个Handler
*/
pipeline.addLast(new IdleStateHandler(3,0,0, TimeUnit.SECONDS));
//HeartbeatServerHandler必须重写userEventTriggered方法,用来做具体的超时的业务处理
pipeline.addLast(new HeartbeatServerHandler());
}
});
System.out.println("Netty服务端启动了");
ChannelFuture channelFuture = serverBootstrap.bind(9090).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
} - 业务Handler
在IdleStateHandler被成功执行后(你问我怎么才算成功执行,根据我们在服务端给定的参数,可知当客户端3秒内没有发来消息,就会被成功执行),接着会创建出IdleStateEvent对象,将该对象交给下一个Handler。且下一个Handler必须重写userEventTriggered方法。这里在userEventTriggered方法中的业务逻辑为获取IdleStateEvent对象,判断他是什么类型的超时,当连续超时次数达到3次时就会断开连接。
当客户端在3秒内发来数据时,IdleStateHandler就不会被执行,会执行自定义Handler中的channelRead0方法,将超市次数置0。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34public class HeartbeatServerHandler extends SimpleChannelInboundHandler<String> {
//读操作的超时次数
int readIdleTimes = 0;
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
readIdleTimes = 0;
System.out.println("服务端收到的心跳:"+msg);
ctx.writeAndFlush("服务端已经收到心跳");
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
IdleStateEvent event = (IdleStateEvent) evt;
switch (event.state()){
case READER_IDLE:
readIdleTimes++;
System.out.println("读超时");
break;
case WRITER_IDLE:
System.out.println("写超时");
break;
case ALL_IDLE:
System.out.println("读写超时");
break;
}
//如果超时次数>3 则关闭客户端的连接
if(readIdleTimes>3){
System.out.println("读超时超过3次,关闭连接");
ctx.writeAndFlush("超时关闭");
ctx.channel().close();
}
}
}断线重连
断线重连发生在客户端中,分为两种情况,一是第一次连接失败,二是断线了.
- 这里是将连接服务端代码抽取成了一个方法,当第一次连接失败后会采用递归的方式进行重连
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60public class NettyClient {
private Bootstrap bootstrap;
public Bootstrap getBootstrap() {
return bootstrap;
}
public void setBootstrap(Bootstrap bootstrap) {
this.bootstrap = bootstrap;
}
public static void main(String[] args) {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
NettyClient nettyClient = new NettyClient();
nettyClient.setBootstrap(bootstrap);
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//添加业务处理器,解决连接成功后的服务器断开连接的重连动作
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new NettyReconnectClientHandler(nettyClient));
}
});
//第一次的连接
nettyClient.connect();
//阻塞
try {
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
}
public void connect(){
//连接服务器
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9090);
//异步的知道连接的结果
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
boolean result = future.isSuccess();
if(!result){
//连接失败==》在一定的时间间隔以后重新连接
future.channel().eventLoop().schedule(()->{
System.out.println("重新连接中...");
connect();
},2000, TimeUnit.MILLISECONDS);
}else{
//连接成功
System.out.println("连接服务器成功");
}
}
});
}
} - 自定义Handler实现断线重连
重写 channelInactive方法(该方法会在断开连接后调用),在该方法中调用封装的1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24public class NettyReconnectClientHandler extends SimpleChannelInboundHandler<String> {
private NettyClient nettyClient;
public NettyReconnectClientHandler(NettyClient nettyClient) {
this.nettyClient = nettyClient;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println("收到服务端的数据:"+msg);
}
/**
* 断线后会调用此方法,重新连接
* @param ctx
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("服务器断线重连中...");
nettyClient.connect();
}
}暂时完结!
Netty暂时了解这么多,后续有需要会接着学习零拷贝和源码。
完结个蛋完结,一堆没学服了。
本来是为了仿微信的项目的而去学的netty,发现学的netty知识不够,回来继续补
知识补充
netty的http
- 由于我在做仿微信项目,所以请求其实是从前端通过http协议发送过来的,所以首先需要学习netty为我们提供的httpde编解码器
| 类型 | 编码器 | 解码器 | 备注 |
|---|---|---|---|
| 服务端处理http请求 | HttpResponseEncoder | HttpRequestDecoder | HttpServerCodec整合了编解码器 |
| 客户端处理http请求 | HttpResponseEncoder | HttpRequestDecoder | HttpClientCodec整合了编解码器 |
http的编解码也一样,在通道中直接添加即可,编解码器介绍完了,当然http请求也会发生类似粘包拆包的情况,所以下一个问题就是解决http请求发来的消息不完整的问题
- HTTP消息聚合
- 处理HTTP时可能接收HTTP消息片段,Netty需要缓冲直到接收完整个消息。要完成的处理HTTP消息,并且内存开销也不会很大,Netty为此提供了HttpObjectAggregator。
- 通过HttpObjectAggregator,Netty可以聚合HTTP消息,使用FullHttpResponse和FullHttpRequest到ChannelPipeline中的下一个ChannelHandler,这就消除了断裂消息,保证了消息的完整。
[你问我什么是FullHttpResponse,FullHttpRequest]
http协议中包含发送请求和响应请求为一次完整的过程,当添加HttpObjectAggregator的时候,netty会自动将请求解析为两部分,一部分是请求头,一部分为请求体。利用这一个特点,可以实现校验,让前端发送的请求需要携带token,这样netty就可以获得token,并校验
WebSocket
刚才说到了http协议的请求经过一次发送和响应就结束,如果需要实现长连接聊天功能就需要让客户端不断地发http请求,这个方法明显太拉跨,所以webSocket出现了, WebSocket允许数据双向传输,而不需要请求-响应模式。
如何使用WebSocket 这就要讲到netty为我们提供的 WebSocketServerProtocolHandler
WebSocketServerProtocolHandler
WebSocketServerProtocolHandler 是 Netty 提供的一个 ChannelHandler,用于处理WebSocket协议。它负责升级HTTP请求到WebSocket连接,并且管理WebSocket连接的生命周期。这意味着它能够自动处理握手过程,以及后续的数据帧的编解码工作,使得开发者可以专注于业务逻辑的实现
- 简单使用
1
2
3
4
5
6
7
8
9
10
11// 添加 WebSocketServerProtocolHandler
pipeline.addLast(new WebSocketServerProtocolHandler(
"/ws", // WebSocket 的路径
null, // Subprotocols (null 表示没有子协议)
true, // Allow extensions (true 表示允许扩展)
65536 * 10 // Max frame payload length
));
// 添加自己的 ChannelHandler 来处理 WebSocket 消息
pipeline.addLast(new TextWebSocketFrameHandler());
Attribute
Attribute 他的底层应该是一个Map,Attribute使用来给channel绑定参数的,一个channel可以绑定多个参数。
例如: 在一个Handler通过Attribute给channel设置了多个参数,传递给下一个Handler。
1 | |






