EventLoop事情循环器
- 单线程执行器
- 处理 Channel 上源源不断的 io 事件:Channel 上通过selector去监听accept(建立连接)、read(读)、write(可写)等事件,通过EventLoop去处理这些事件
我们上面使用的就是EventLoopGroup,包含很多个EventLoop,我们每创建一个连接,就需要绑定到一个EventLoop上,之后EventLoop就会开始监听这个连接(只要连接不关闭,一直都是这个EventLoop负责此Channel),而一个EventLoop可以同时监听很多个Channel,实际上就是我们之前学习的Selector罢了。
当然,EventLoop并不只是用于网络操作的,我们前面所说的EventLoop其实都是NioEventLoop,它是专用于网络通信的,除了网络通信之外,我们也可以使用普通的EventLoop来处理一些其他的事件。
这样会卡住:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| public static void main(String[] args) { EventLoopGroup bossGroup = new NioEventLoopGroup(), workerGroup = new NioEventLoopGroup(1); ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap .group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel channel) { channel.pipeline() .addLast(new ChannelInboundHandlerAdapter(){ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; System.out.println("接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8)); Thread.sleep(10000); ctx.writeAndFlush(Unpooled.wrappedBuffer("已收到!".getBytes())); } }); } }); bootstrap.bind(8080); }
|
可以创建一个普通的EventLoop来处理专门读写之外的任务:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| public static void main(String[] args) { EventLoopGroup bossGroup = new NioEventLoopGroup(), workerGroup = new NioEventLoopGroup(1); EventLoopGroup handlerGroup = new DefaultEventLoopGroup(); ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap .group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel channel) { channel.pipeline() .addLast(new ChannelInboundHandlerAdapter(){ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; System.out.println("接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8)); handlerGroup.submit(() -> { try { Thread.sleep(10000); } catch (InterruptedException e) { throw new RuntimeException(e); } ctx.writeAndFlush(Unpooled.wrappedBuffer("已收到!".getBytes())); }); } }); } }); bootstrap.bind(8080); }
|
也可以创建一条流水线:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| public static void main(String[] args) { EventLoopGroup bossGroup = new NioEventLoopGroup(), workerGroup = new NioEventLoopGroup(1); EventLoopGroup handlerGroup = new DefaultEventLoopGroup(); ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap .group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel channel) { channel.pipeline() .addLast(new ChannelInboundHandlerAdapter(){ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; System.out.println("接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8)); ctx.fireChannelRead(msg); } }).addLast(handlerGroup, new ChannelInboundHandlerAdapter(){ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { Thread.sleep(10000); } catch (InterruptedException e) { throw new RuntimeException(e); } ctx.writeAndFlush(Unpooled.wrappedBuffer("已收到!".getBytes())); } }); } }); bootstrap.bind(8080); }
|
Future和Promise
我们接着来看ChannelFuture,前面我们提到,Netty中Channel的相关操作都是异步进行的,并不是在当前线程同步执行,我们不能立即得到执行结果,如果需要得到结果,那么我们就必须要利用到Future。
我们先来看看ChannelFutuer接口怎么定义的:
1 2 3 4 5 6 7 8 9 10 11 12
| public interface ChannelFuture extends Future<Void> { Channel channel(); ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> var1); ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... var1); ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> var1); ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... var1); ChannelFuture sync() throws InterruptedException; ChannelFuture syncUninterruptibly(); ChannelFuture await() throws InterruptedException; ChannelFuture awaitUninterruptibly(); boolean isVoid(); }
|
1 2 3 4 5
| future.sync(); ChannelFuture future = bootstrap.bind(8080); future.addListener(f -> System.out.println("我是服务端启动完成之后要做的事情!"));
|
Promise接口,它支持手动设定成功和失败的结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public interface Promise<V> extends Future<V> { Promise<V> setSuccess(V var1); boolean trySuccess(V var1); Promise<V> setFailure(Throwable var1); boolean tryFailure(Throwable var1); boolean setUncancellable(); Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> var1); Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... var1); Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> var1); Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... var1); Promise<V> await() throws InterruptedException; Promise<V> awaitUninterruptibly(); Promise<V> sync() throws InterruptedException; Promise<V> syncUninterruptibly(); }
|