Netty工作模型
Netty为我们提供的更高级的缓冲区类,我们接着来看看Netty是如何工作的,上一章我们介绍了Reactor模式,而Netty正是以主从Reactor多线程模型为基础,构建出了一套高效的工作模型。
- Netty 抽象出两组线程池BossGroup和WorkerGroup,BossGroup专门负责接受客户端的连接, WorkerGroup专门负读写,就像我们前面说的主从Reactor一样。
- 无论是BossGroup还是WorkerGroup,都是使用EventLoop(事件循环,很多系统都采用了事件循环机制,比如前端框架Node.js,事件循环顾名思义,就是一个循环,不断地进行事件通知)来进行事件监听的,整个Netty也是使用事件驱动来运作的,比如当客户端已经准备好读写、连接建立时,都会进行事件通知,说白了就像我们之前写NIO多路复用那样,只不过这里换成EventLoop了而已,它已经帮助我们封装好了一些常用操作,而且我们可以自己添加一些额外的任务,如果有多个EventLoop,会存放在EventLoopGroup中,EventLoopGroup就是BossGroup和WorkerGroup的具体实现。
- 在BossGroup之后,会正常将SocketChannel绑定到WorkerGroup中的其中一个EventLoop上,进行后续的读写操作监听。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| public static void main(String[] args) { EventLoopGroup bossGroup = new NioEventLoopGroup(), workerGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap .group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel channel) { channel.pipeline().addLast(new ChannelInboundHandlerAdapter(){ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf buf = (ByteBuf) msg; System.out.println(Thread.currentThread().getName()+" >> 接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8)); ctx.writeAndFlush(Unpooled.wrappedBuffer("已收到!".getBytes())); } }); } }); bootstrap.bind(8080); }
|
Alt+7查看所有的方法,Crtl+H是查看类的继承关系
Channel详解
在学习NIO时,我们就已经接触到Channel了,我们可以通过通道来进行数据的传输,并且通道支持双向传输。
而在Netty中,也有对应的Channel类型:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> { ChannelId id(); EventLoop eventLoop(); Channel parent(); ChannelConfig config(); boolean isOpen(); boolean isRegistered(); boolean isActive(); ChannelMetadata metadata(); SocketAddress localAddress(); SocketAddress remoteAddress(); ChannelFuture closeFuture(); boolean isWritable(); long bytesBeforeUnwritable(); long bytesBeforeWritable(); Unsafe unsafe(); ChannelPipeline pipeline(); ByteBufAllocator alloc(); Channel read(); Channel flush(); }
|
可以看到,Netty中的Channel相比NIO功能就多得多了。Netty中的Channel主要特点如下:
- 所有的IO操作都是异步的,并不是在当前线程同步运行,方法调用之后就直接返回了,那怎么获取操作的结果呢?还记得我们在前面JUC篇教程中学习的Future吗,没错,这里的ChannelFuture也是干这事的。
我们可以来看一下Channel接口的父接口ChannelOutboundInvoker接口,这里面定义了大量的I/O操作
我们了解了Netty底层的Channel之后,我们接着来看ChannelHandler,既然现在有了通道,那么怎么进行操作呢?我们可以将需要处理的事情放在ChannelHandler中,ChannelHandler充当了所有入站和出站数据的应用程序逻辑的容器,实际上就是我们之前Reactor模式中的Handler,全靠它来处理读写操作。
不过这里不仅仅是一个简单的ChannelHandler在进行处理,而是一整套流水线,我们之后会介绍ChannelPipeline。
比如我们上面就是使用了ChannelInboundHandlerAdapter抽象类,它是ChannelInboundHandler接口的实现,用于处理入站数据,可以看到我们实际上就是通过重写对应的方法来进行处理,这些方法会在合适的时间被调用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public interface ChannelInboundHandler extends ChannelHandler { void channelRegistered(ChannelHandlerContext var1) throws Exception;
void channelUnregistered(ChannelHandlerContext var1) throws Exception;
void channelActive(ChannelHandlerContext var1) throws Exception;
void channelInactive(ChannelHandlerContext var1) throws Exception;
void channelRead(ChannelHandlerContext var1, Object var2) throws Exception;
void channelReadComplete(ChannelHandlerContext var1) throws Exception;
void userEventTriggered(ChannelHandlerContext var1, Object var2) throws Exception;
void channelWritabilityChanged(ChannelHandlerContext var1) throws Exception;
void exceptionCaught(ChannelHandlerContext var1, Throwable var2) throws Exception; }
|
主要是ChannelInboundHandlerAdapter这个类调用的,我们新建立一个Test类来继承
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| package org.wsy.nettyChannel;
import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter;
import java.nio.charset.StandardCharsets;
public class TestChannelHandler extends ChannelInboundHandlerAdapter {
public void channelRegistered(ChannelHandlerContext ctx) throws Exception { System.out.println("channelRegistered"); }
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { System.out.println("channelUnregistered"); }
public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("channelActive"); }
public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("channelInactive"); }
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; System.out.println(Thread.currentThread().getName()+" >> 接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8)); ByteBuf back = ctx.alloc().buffer(); back.writeCharSequence("已收到!", StandardCharsets.UTF_8); ctx.writeAndFlush(back); System.out.println("channelRead"); }
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { System.out.println("channelReadComplete"); }
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { System.out.println("userEventTriggered"); }
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { System.out.println("channelWritabilityChanged"); }
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("exceptionCaught"+cause); } }
|
那么再次进入handler中的方法就是我们自定义的这个。
handler流水线:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| package org.wsy.nettyChannel;
import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel;
import java.nio.charset.StandardCharsets;
public class NettyTest02 { public static void main(String[] args) { EventLoopGroup bossGroup = new NioEventLoopGroup(), workerGroup = new NioEventLoopGroup(); ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap .group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel channel) { channel.pipeline() .addLast(new ChannelInboundHandlerAdapter(){ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; System.out.println("接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8)); throw new RuntimeException("我是异常"); } }) .addLast(new ChannelInboundHandlerAdapter(){ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("我是异常处理:"+cause); } }); } }); bootstrap.bind(8080); } }
|
如果将一个消息在两个Handler中进行处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| package org.wsy.nettyChannel;
import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel;
import java.nio.charset.StandardCharsets;
public class NettyTest03 { public static void main(String[] args) { EventLoopGroup bossGroup = new NioEventLoopGroup(), workerGroup = new NioEventLoopGroup(); ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap .group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel channel) { channel.pipeline() .addLast(new ChannelInboundHandlerAdapter(){ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; System.out.println("1接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8)); ctx.fireChannelRead(msg); } }) .addLast(new ChannelInboundHandlerAdapter(){ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; System.out.println("2接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8)); } }); } }); bootstrap.bind(8080); } }
|
出站操作在流水线上是反着来的,整个流水线操作大概流程如下: