Netty工作模型

Netty为我们提供的更高级的缓冲区类,我们接着来看看Netty是如何工作的,上一章我们介绍了Reactor模式,而Netty正是以主从Reactor多线程模型为基础,构建出了一套高效的工作模型。

  • Netty 抽象出两组线程池BossGroup和WorkerGroup,BossGroup专门负责接受客户端的连接, WorkerGroup专门负读写,就像我们前面说的主从Reactor一样。
  • 无论是BossGroup还是WorkerGroup,都是使用EventLoop(事件循环,很多系统都采用了事件循环机制,比如前端框架Node.js,事件循环顾名思义,就是一个循环,不断地进行事件通知)来进行事件监听的,整个Netty也是使用事件驱动来运作的,比如当客户端已经准备好读写、连接建立时,都会进行事件通知,说白了就像我们之前写NIO多路复用那样,只不过这里换成EventLoop了而已,它已经帮助我们封装好了一些常用操作,而且我们可以自己添加一些额外的任务,如果有多个EventLoop,会存放在EventLoopGroup中,EventLoopGroup就是BossGroup和WorkerGroup的具体实现。
  • 在BossGroup之后,会正常将SocketChannel绑定到WorkerGroup中的其中一个EventLoop上,进行后续的读写操作监听。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public static void main(String[] args) {
//这里我们使用NioEventLoopGroup实现类即可,创建BossGroup和WorkerGroup
//当然还有EpollEventLoopGroup,但是仅支持Linux,这是Netty基于Linux底层Epoll单独编写的一套本地实现,没有使用NIO那套
EventLoopGroup bossGroup = new NioEventLoopGroup(), workerGroup = new NioEventLoopGroup();

//创建服务端启动引导类
ServerBootstrap bootstrap = new ServerBootstrap();
//可链式,就很棒
bootstrap
.group(bossGroup, workerGroup) //指定事件循环组
.channel(NioServerSocketChannel.class) //指定为NIO的ServerSocketChannel
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) {
//获取流水线,当我们需要处理客户端的数据时,实际上是像流水线一样在处理,这个流水线上可以有很多Handler
channel.pipeline().addLast(new ChannelInboundHandlerAdapter(){ //添加一个Handler,这里使用ChannelInboundHandlerAdapter
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) { //ctx是上下文,msg是收到的消息,默认以ByteBuf形式(也可以是其他形式,后面再说)
ByteBuf buf = (ByteBuf) msg; //类型转换一下
System.out.println(Thread.currentThread().getName()+" >> 接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8));
//通过上下文可以直接发送数据回去,注意要writeAndFlush才能让客户端立即收到
ctx.writeAndFlush(Unpooled.wrappedBuffer("已收到!".getBytes()));
}
});
}
});
//最后绑定端口,启动
bootstrap.bind(8080);
}

Alt+7查看所有的方法,Crtl+H是查看类的继承关系

Channel详解

在学习NIO时,我们就已经接触到Channel了,我们可以通过通道来进行数据的传输,并且通道支持双向传输。

而在Netty中,也有对应的Channel类型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {
ChannelId id(); //通道ID
EventLoop eventLoop(); //获取此通道所属的EventLoop,因为一个Channel在它的生命周期内只能注册到一个EventLoop中
Channel parent(); //Channel是具有层级关系的,这里是返回父Channel
ChannelConfig config();
boolean isOpen(); //通道当前的相关状态
boolean isRegistered();
boolean isActive();
ChannelMetadata metadata(); //通道相关信息
SocketAddress localAddress();
SocketAddress remoteAddress();
ChannelFuture closeFuture(); //关闭通道,但是会用到ChannelFuture,后面说
boolean isWritable();
long bytesBeforeUnwritable();
long bytesBeforeWritable();
Unsafe unsafe();
ChannelPipeline pipeline(); //流水线,之后也会说
ByteBufAllocator alloc(); //可以直接从Channel拿到ByteBufAllocator的实例,来分配ByteBuf
Channel read();
Channel flush(); //刷新,基操
}

可以看到,Netty中的Channel相比NIO功能就多得多了。Netty中的Channel主要特点如下:

  • 所有的IO操作都是异步的,并不是在当前线程同步运行,方法调用之后就直接返回了,那怎么获取操作的结果呢?还记得我们在前面JUC篇教程中学习的Future吗,没错,这里的ChannelFuture也是干这事的。

我们可以来看一下Channel接口的父接口ChannelOutboundInvoker接口,这里面定义了大量的I/O操作

我们了解了Netty底层的Channel之后,我们接着来看ChannelHandler,既然现在有了通道,那么怎么进行操作呢?我们可以将需要处理的事情放在ChannelHandler中,ChannelHandler充当了所有入站和出站数据的应用程序逻辑的容器,实际上就是我们之前Reactor模式中的Handler,全靠它来处理读写操作。

不过这里不仅仅是一个简单的ChannelHandler在进行处理,而是一整套流水线,我们之后会介绍ChannelPipeline。

比如我们上面就是使用了ChannelInboundHandlerAdapter抽象类,它是ChannelInboundHandler接口的实现,用于处理入站数据,可以看到我们实际上就是通过重写对应的方法来进行处理,这些方法会在合适的时间被调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public interface ChannelInboundHandler extends ChannelHandler {
void channelRegistered(ChannelHandlerContext var1) throws Exception;

void channelUnregistered(ChannelHandlerContext var1) throws Exception;

void channelActive(ChannelHandlerContext var1) throws Exception;

void channelInactive(ChannelHandlerContext var1) throws Exception;

void channelRead(ChannelHandlerContext var1, Object var2) throws Exception;

void channelReadComplete(ChannelHandlerContext var1) throws Exception;

void userEventTriggered(ChannelHandlerContext var1, Object var2) throws Exception;

void channelWritabilityChanged(ChannelHandlerContext var1) throws Exception;

void exceptionCaught(ChannelHandlerContext var1, Throwable var2) throws Exception;
}

主要是ChannelInboundHandlerAdapter这个类调用的,我们新建立一个Test类来继承

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package org.wsy.nettyChannel;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.nio.charset.StandardCharsets;

public class TestChannelHandler extends ChannelInboundHandlerAdapter {

public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelRegistered");
}

public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelUnregistered");
}

public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelActive");
}

public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelInactive");
}

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println(Thread.currentThread().getName()+" >> 接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8));
//这次我们就直接使用ctx.alloc()来生成缓冲区
ByteBuf back = ctx.alloc().buffer();
back.writeCharSequence("已收到!", StandardCharsets.UTF_8);
ctx.writeAndFlush(back);
System.out.println("channelRead");
}

public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelReadComplete");
}

public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
System.out.println("userEventTriggered");
}

public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelWritabilityChanged");
}

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("exceptionCaught"+cause);
}
}
1

那么再次进入handler中的方法就是我们自定义的这个。

handler流水线:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package org.wsy.nettyChannel;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

import java.nio.charset.StandardCharsets;

public class NettyTest02 {
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup(), workerGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
//ChannelInitializer是一个特殊的ChannelHandler,它本身不处理任何出站/入站事件,它的目的仅仅是完成Channel的初始化
.childHandler(new ChannelInitializer<SocketChannel>() { //注意,这里的SocketChannel不是我们NIO里面的,是Netty的
@Override
protected void initChannel(SocketChannel channel) {
channel.pipeline() //直接获取pipeline,然后添加两个Handler,注意顺序
.addLast(new ChannelInboundHandlerAdapter(){ //第一个用于处理消息接收
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8));
throw new RuntimeException("我是异常");
}
})
.addLast(new ChannelInboundHandlerAdapter(){ //第二个用于处理异常
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("我是异常处理:"+cause);
}
});
}
});
bootstrap.bind(8080);
}
}

如果将一个消息在两个Handler中进行处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package org.wsy.nettyChannel;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

import java.nio.charset.StandardCharsets;

public class NettyTest03 {
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup(), workerGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
//ChannelInitializer是一个特殊的ChannelHandler,它本身不处理任何出站/入站事件,它的目的仅仅是完成Channel的初始化
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) {
channel.pipeline() //直接获取pipeline,然后添加两个Handler
.addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("1接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8));
ctx.fireChannelRead(msg); //通过ChannelHandlerContext
}
})
.addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("2接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8));
}
});
}
});
bootstrap.bind(8080);
}
}

出站操作在流水线上是反着来的,整个流水线操作大概流程如下: