Netty编码器、解码器

在网络传输的过程中,数据都是以字节流的方式进行传递。客户端在向服务端发送数据的时候,将业务中其他类型数据转化为字节,叫编码。服务端接收到数据为字节流,将字节流转化为原来的格式,叫解码。统称codec。

编解码器分为两部分-编码器和解码器,编码器负责出站,解码器负责入站。

在前面的学习中,我们的数据发送和接收都是需要以ByteBuf形式传输,但是这样是不是有点太不方便了,咱们能不能参考一下JavaWeb那种搞个Filter,在我们开始处理数据之前,过过滤一次,并在过滤的途中将数据转换成我们想要的类型,也可以将发出的数据进行转换,这就要用到编码解码器了。

我们先来看看最简的,字符串,如果我们要直接在客户端或是服务端处理字符串,可以直接添加一个字符串解码器到我们的流水线中:

使用了编码器解码器的客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package org.wsy.encoder;

import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.nio.charset.StandardCharsets;
import java.util.Scanner;

public class Client {
public static void main(String[] args) {
Bootstrap bootstrap = new Bootstrap();
bootstrap
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast(new StringDecoder()) //解码器安排
.addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(">> 接收到客户端发送的数据:" + msg); //直接接收字符串
}
})
.addLast(new StringEncoder()); //编码器安排
}
});
Channel channel = bootstrap.connect("localhost", 8080).channel();
try(Scanner scanner = new Scanner(System.in)){
while (true) {
System.out.println("<< 请输入要发送给服务端的内容:");
String text = scanner.nextLine();
if(text.isEmpty()) continue;
channel.writeAndFlush(text); //直接发送字符串就行
}
}
}
}

使用了编码器解码器的服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package org.wsy.encoder;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.nio.charset.StandardCharsets;

public class NettyEncoder {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup(), workerGroup = new NioEventLoopGroup(1); //线程数先限制一下
EventLoopGroup handlerGroup = new DefaultEventLoopGroup(); //使用DefaultEventLoop来处理其他任务
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) {
channel.pipeline()
//解码器本质上也算是一种ChannelInboundHandlerAdapter,用于处理入站请求
.addLast(new StringDecoder())
.addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("收到客户端的数据:"+msg);
ctx.channel().writeAndFlush("可以,不跟你多BB"); //直接发字符串回去
}
})
.addLast(new StringEncoder()); //使用内置的StringEncoder可以直接将出站的字符串数据编码成ByteBuf
}
});
ChannelFuture future = bootstrap.bind(8080);
// future.sync(); //让当前线程同步等待任务完成
System.out.println("服务端启动状态:"+future.isDone());
System.out.println("我是服务端启动完成之后要做的事情!");
}
}

之前传递的都是这个

1
Unpooled.wrappedBuffer("已收到!".getBytes())

我们在一开始提到的粘包/拆包问题,也可以使用一个解码器解决:

1
2
3
4
channel.pipeline()
.addLast(new FixedLengthFrameDecoder(10))
//第一种解决方案,使用定长数据包,每个数据包都要是指定长度
...

特定的分隔符

1
2
3
4
channel.pipeline()
.addLast(new DelimiterBasedFrameDecoder(1024, Unpooled.wrappedBuffer("---".getBytes())))
//第二种,就是指定一个特定的分隔符,比如我们这里以感叹号为分隔符
//在收到分隔符之前的所有数据,都作为同一个数据包的内容

添加固定的长度

1
2
3
4
5
channel.pipeline()
.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4))
//第三种方案,就是在头部添加长度信息,来确定当前发送的数据包具体长度是多少
//offset是从哪里开始,length是长度信息占多少字节,这里是从0开始读4个字节表示数据包长度
.addLast(new StringDecoder())

Http协议

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package org.wsy.http;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.nio.charset.StandardCharsets;

public class NettyEncoder {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup(), workerGroup = new NioEventLoopGroup(1); //线程数先限制一下
EventLoopGroup handlerGroup = new DefaultEventLoopGroup(); //使用DefaultEventLoop来处理其他任务
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) {
channel.pipeline()
.addLast(new HttpRequestDecoder()) //Http请求解码器
.addLast(new HttpObjectAggregator(Integer.MAX_VALUE)) //搞一个聚合器,将内容聚合为一个FullHttpRequest,参数是最大内容长度
.addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
FullHttpRequest request = (FullHttpRequest) msg;
System.out.println("浏览器请求路径:"+request.uri()); //直接获取请求相关信息
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
response.content().writeCharSequence("Hello World!", StandardCharsets.UTF_8);
ctx.channel().writeAndFlush(response);
ctx.channel().close();
}
})
.addLast(new HttpResponseEncoder());
}
});
ChannelFuture future = bootstrap.bind(8080);
// future.sync(); //让当前线程同步等待任务完成
System.out.println("服务端启动状态:"+future.isDone());
System.out.println("我是服务端启动完成之后要做的事情!");
}
}

Netty:启动流程源码解析_哔哩哔哩_bilibili