没有想到,整整一个九月,就看了 Netty 的一个方法。
九月份学习完 ServerBootstrap,下个月就先不看 Netty 了,换一换脑子。
在上一篇博文中,我们初识了 Netty,知道了它是基于 NIO 的异步非阻塞 I/O 框架,它能做很多事情但它也有很多类。
Netty 进行 I/O 操作是靠 NIO 的 Channel 类实现的,不过 Netty 自己也有一个 Channel 类,它内部包着 NIO 的 Channel(有点绕)。Netty 的 Channel 类有特别多的变量,它几乎包含着 I/O 操作所需要的全部东西,因此创建一个 Channel 并设置参数,是一件相当复杂的事。Netty 考虑到这一点,提供了 Bootstrap 类,这个类就是为创建 Channel 而生的,它的作用就是方便快捷地创建 Channel。
再看一遍这句解释,体会一下 Bootstrap 的作用:
AbstractBootstrap is a helper class that makes it easy to bootstrap a Channel.
上面所说的 AbstractBootstrap 是一切 Netty 中 Bootstrap 的父类,我们今天要学习的是两种 Bootstrap:ServerBootstrap
和 Bootstrap
。前者 ServerBootstrap
用来创建服务端 Channel,后者 Bootstrap
用来创建客户端 Channel。
回顾一下服务端和客户端。(粗浅地讲)服务端提供中心服务功能,暴露出来一个端口,用户那边的客户端,可以通过刚才那个端口连接到服务端。服务端和客户端通过同一个端口进行 I/O 操作,服务端绑定(bind)端口,客户端连接(connect)端口,两边都连好了,就可以通信了。
ServerBootstrap 和 Bootstrap 可以各自通过 bind(…) 和 connect(…) 方法,生成一个 ChannelFuture 实例,这个 ChannelFuture 实例就代表着 I/O 操作的结果。
1 2 3 4 ChannelFuture serverFuture = serverBootstrap.bind(8080 ); ChannelFuture clientFuture = bootstrap.connect("127.0.0.1" , 8080 );
在 serverBootstrap 执行 bind(…) 方法时,它将隐含地创建 Netty 的 Channel 实例,并把自己的参数都设置到 Channel 身上,这也就是我们刚才说的,Bootstrap 的作用就是方便快捷地创建 Channel 实例(实际上我们从始至终都没看到 Channel 实例)。对于 bootstrap 也是一样,它执行 connect(…) 方法时也会隐含地创建一个 Channel 实例,设置好参数,连接端口,最后返回一个 ChannelFuture。
重新梳理一遍:
ServerBootstrap 和 Bootstrap 各自用来快速创建服务端的 Channel
和客户端的 Channel
,并用这个 Channel 来进行 I/O 操作(这个过程对外是隐藏的)。
ServerBootstrap 对象执行 bind(…) 方法,绑定端口,这个过程中会隐含地创建服务端的 Channel
。
Bootstrap 对象执行 connect(…) 方法,连接端口,这个过程中会隐含地创建客户端的 Channel
。
如果到这里不能理解,那么接下去的内容应该也不用看了。
下面先不提 bind(…) 和 connect(…) 方法,先开始讲 ServerBootstrap 和 Bootstrap 是怎么创建的,又是怎么设置复杂的参数的。
ServerBootstrap 服务端 Channel 的引导类。
虽说是“方便快捷”地创建 Channel,但其实代码还挺多的。下面是示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel (SocketChannel ch) { ch.pipeline().addLast(new DiscardServerHandler()); } }) .handler(new LoggingHandler(LogLevel.INFO)) .option(ChannelOption.SO_BACKLOG, 128 ) .childOption(ChannelOption.SO_KEEPALIVE, true );
ServerBootstrap 继承自 AbstractBootstrap,两个类的实例变量加在一起有 11 个,基本都有用处。
最开始的代码就是为了设置这些变量,ServerBootstrap 实例调了 6 个方法设置好了 7 个参数,设置的过程可以参照下图:
简单地过一遍这些变量,都是用来做什么的:
变量
作用
channelFactory
ServerBootstrap 是服务端 Channel 的引导类,它是用来快速创建并配置 Channel 的,而创建 Channel 就是通过这个变量,利用工厂方法创建的
group
这是一个线程池(boss),它的作用是连接,在绑定端口之后接收连接,每接到一个连接,指派给 childGroup 去做 I/O 操作
childGroup
这也是一个线程池(worker),它的作用是 I/O,也就是负责具体的 I/O 操作,一个线程负责一个 Channel 的 I/O
handler
设置 ChannelHandler,用于处理服务端 Channel 的发送
childHandler
设置 ChannelHandler,用于处理客户端 Channel 的接收
option
设置服务端 Channel 的选项参数
childOption
设置客户端 Channel 的选项参数
config
其实就是 Bootstrap 对象自身(this),作用是在父类的方法中,也能获得本对象的一些参数
写累了,看代码吧。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 public ServerBootstrap group (EventLoopGroup parentGroup, EventLoopGroup childGroup) { super .group(parentGroup); if (this .childGroup != null ) { throw new IllegalStateException("childGroup set already" ); } this .childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup" ); return this ; } public B group (EventLoopGroup group) { ObjectUtil.checkNotNull(group, "group" ); if (this .group != null ) { throw new IllegalStateException("group set already" ); } this .group = group; return self(); } public B channel (Class<? extends C> channelClass) { return channelFactory(new ReflectiveChannelFactory<C>( ObjectUtil.checkNotNull(channelClass, "channelClass" ) )); } public B channelFactory (io.netty.channel.ChannelFactory<? extends C> channelFactory) { return channelFactory((ChannelFactory<C>) channelFactory); } public B channelFactory (ChannelFactory<? extends C> channelFactory) { ObjectUtil.checkNotNull(channelFactory, "channelFactory" ); if (this .channelFactory != null ) { throw new IllegalStateException("channelFactory set already" ); } this .channelFactory = channelFactory; return self(); } public ReflectiveChannelFactory (Class<? extends T> clazz) { ObjectUtil.checkNotNull(clazz, "clazz" ); try { this .constructor = clazz.getConstructor(); } catch (NoSuchMethodException e) { throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) + " does not have a public non-arg constructor" , e); } } public ServerBootstrap childHandler (ChannelHandler childHandler) { this .childHandler = ObjectUtil.checkNotNull(childHandler, "childHandler" ); return this ; } public B handler (ChannelHandler handler) { this .handler = ObjectUtil.checkNotNull(handler, "handler" ); return self(); } public <T> B option (ChannelOption<T> option, T value) { ObjectUtil.checkNotNull(option, "option" ); synchronized (options) { if (value == null ) { options.remove(option); } else { options.put(option, value); } } return self(); } public <T> ServerBootstrap childOption (ChannelOption<T> childOption, T value) { ObjectUtil.checkNotNull(childOption, "childOption" ); synchronized (childOptions) { if (value == null ) { childOptions.remove(childOption); } else { childOptions.put(childOption, value); } } return this ; }
Bootstrap 客户端 Channel 的引导类。
Bootstrap 相比 ServerBootstrap,只有减少没有增加,简单看一个示例就可以了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 Bootstrap bootstrap = new Bootstrap(); bootstrap.group(workerGroup) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true ) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel (SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new LoggingHandler(LogLevel.INFO)); pipeline.addLast(new EchoServerHandler()); } });
应该能看出一点套路了。
下图来自《Netty 源码解析系列 》,是 ServerBootstrap 和 Bootstrap 的使用套路,再感受一下。
接下来,我们只关注一行代码:
1 ChannelFuture future = serverBootstrap.bind(port).sync();
ServerBootstrap 的 bind(port)
方法的作用是创建一个 Channel 对象,并绑定上一个端口,它返回的是 ChannelFuture(更准确地讲,是 DefaultChannelPromise),也就是说在执行 bind(port)
方法之后,会获取一个 Future 对象,bind 的结果将异步设置进 Future 对象中。
sync()
方法其实只是在等待,一直等到异步结果返回为止,也就是说,执行完这行代码,channel 已经绑定上端口号了。因此 sync()
并不是重点,我们重点要看的是 bind(port)
方法。下面的所有内容,都是在学习 bind(port)
方法究竟做了什么,很长。
把结果放在最前面,后续代码太多,只写注释好了。
bind(port)
方法主要做了这些事情:
创建 Channel 对象
初始化 channel(尤其是初始化 pipeline)
注册 channel
创建 JDK 底层的 Channel 对象并注册 selectionKey
pipeline 上 handler 执行回调事件
pipeline 上 handler 执行注册事件(registered)
pipeline 上 handler 执行启动事件(active)
channel 绑定端口号
开始看代码吧:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 public ChannelFuture bind (int inetPort) { return bind(new InetSocketAddress(inetPort)); } public ChannelFuture bind (SocketAddress localAddress) { validate(); return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress" )); } private ChannelFuture doBind (final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null ) { return regFuture; } if (regFuture.isDone()) { ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete (ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null ) { promise.setFailure(cause); } else { promise.registered(); doBind0(regFuture, channel, localAddress, promise); } } }); return promise; } }
接下来有一个非常庞大的方法:initAndRegister()
。
这个方法拆成 init
和 register
两部分来看,先看 init
的部分。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 final ChannelFuture initAndRegister () { Channel channel = null ; try { channel = channelFactory.newChannel(); init(channel); } catch (Throwable t) { if (channel != null ) { channel.unsafe().closeForcibly(); return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t); } ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null ) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; } @Override void init (Channel channel) { setChannelOptions(channel, newOptionsArray(), logger); setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY)); ChannelPipeline p = channel.pipeline(); final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY); } final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY); p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel (final Channel ch) { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null ) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run () { pipeline.addLast(new ServerBootstrap.ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); } @Override public final ChannelPipeline addLast (ChannelHandler... handlers) { return addLast(null , handlers); } @Override public final ChannelPipeline addLast (EventExecutorGroup executor, ChannelHandler... handlers) { ObjectUtil.checkNotNull(handlers, "handlers" ); for (ChannelHandler h: handlers) { if (h == null ) { break ; } addLast(executor, null , h); } return this ; } @Override public final ChannelPipeline addLast (EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; synchronized (this ) { checkMultiplicity(handler); newCtx = newContext(group, filterName(name, handler), handler); addLast0(newCtx); if (!registered) { newCtx.setAddPending(); callHandlerCallbackLater(newCtx, true ); return this ; } EventExecutor executor = newCtx.executor(); if (!executor.inEventLoop()) { callHandlerAddedInEventLoop(newCtx, executor); return this ; } } callHandlerAdded0(newCtx); return this ; } private void callHandlerCallbackLater (AbstractChannelHandlerContext ctx, boolean added) { assert !registered; DefaultChannelPipeline.PendingHandlerCallback task = added ? new DefaultChannelPipeline.PendingHandlerAddedTask(ctx) : new DefaultChannelPipeline.PendingHandlerRemovedTask(ctx); DefaultChannelPipeline.PendingHandlerCallback pending = pendingHandlerCallbackHead; if (pending == null ) { pendingHandlerCallbackHead = task; } else { while (pending.next != null ) { pending = pending.next; } pending.next = task; } } private final class PendingHandlerAddedTask extends DefaultChannelPipeline .PendingHandlerCallback { PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) { super (ctx); } @Override public void run () { callHandlerAdded0(ctx); } @Override void execute () { EventExecutor executor = ctx.executor(); if (executor.inEventLoop()) { callHandlerAdded0(ctx); } else { try { executor.execute(this ); } catch (RejectedExecutionException e) { if (logger.isWarnEnabled()) { logger.warn("Can't invoke handlerAdded() as the EventExecutor {} rejected it, removing handler {}." , executor, ctx.name(), e); } atomicRemoveFromHandlerList(ctx); ctx.setRemoved(); } } } }
上面看完了 initAndRegister()
方法的 init
部分,接下来看 register
部分。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 @Override public ChannelFuture register (Channel channel) { return next() .register(channel); } @Override public ChannelFuture register (Channel channel) { return register( new DefaultChannelPromise(channel, this ) ); } @Override public ChannelFuture register (final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise" ); promise.channel().unsafe().register(this , promise); return promise; } @Override public final void register (EventLoop eventLoop, final ChannelPromise promise) { ObjectUtil.checkNotNull(eventLoop, "eventLoop" ); if (isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already" )); return ; } if (!isCompatible(eventLoop)) { promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return ; } AbstractChannel.this .eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(() -> { register0(promise); }); } catch (Throwable t) { logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}" , AbstractChannel.this , t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } } private void register0 (ChannelPromise promise) { try { if (!promise.setUncancellable() || !ensureOpen(promise)) { return ; } boolean firstRegistration = neverRegistered; doRegister(); neverRegistered = false ; registered = true ; pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); pipeline.fireChannelRegistered(); if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { beginRead(); } } } catch (Throwable t) { closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } final void invokeHandlerAddedIfNeeded () { assert channel.eventLoop().inEventLoop(); if (firstRegistration) { firstRegistration = false ; callHandlerAddedForAllHandlers(); } } private void callHandlerAddedForAllHandlers () { final DefaultChannelPipeline.PendingHandlerCallback pendingHandlerCallbackHead; synchronized (this ) { assert !registered; registered = true ; pendingHandlerCallbackHead = this .pendingHandlerCallbackHead; this .pendingHandlerCallbackHead = null ; } DefaultChannelPipeline.PendingHandlerCallback task = pendingHandlerCallbackHead; while (task != null ) { task.execute(); task = task.next; } } private void callHandlerAdded0 (final AbstractChannelHandlerContext ctx) { try { ctx.callHandlerAdded(); } catch (Throwable t) { boolean removed = false ; try { atomicRemoveFromHandlerList(ctx); ctx.callHandlerRemoved(); removed = true ; } catch (Throwable t2) { if (logger.isWarnEnabled()) { logger.warn("Failed to remove a handler: " + ctx.name(), t2); } } if (removed) { fireExceptionCaught(new ChannelPipelineException(ctx.handler().getClass().getName() + ".handlerAdded() has thrown an exception; removed." , t)); } else { fireExceptionCaught(new ChannelPipelineException(ctx.handler().getClass().getName() + ".handlerAdded() has thrown an exception; also failed to remove." , t)); } } } final void callHandlerAdded () throws Exception { if (setAddComplete()) { handler().handlerAdded(this ); } } @Override public void handlerAdded (ChannelHandlerContext ctx) throws Exception { if (ctx.channel().isRegistered()) { if (initChannel(ctx)) { removeState(ctx); } } } private boolean initChannel (ChannelHandlerContext ctx) throws Exception { if (initMap.add(ctx)) { try { initChannel((C) ctx.channel()); } catch (Throwable cause) { exceptionCaught(ctx, cause); } finally { ChannelPipeline pipeline = ctx.pipeline(); if (pipeline.context(this ) != null ) { pipeline.remove(this ); } } return true ; } return false ; } @Override void init (Channel channel) { setChannelOptions(channel, newOptionsArray(), logger); setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY)); ChannelPipeline p = channel.pipeline(); final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Map.Entry<ChannelOption<?>, Object>[] currentChildOptions; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY); } final Map.Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY); p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel (final Channel ch) { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null ) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run () { pipeline.addLast(new ServerBootstrap.ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }
上面有大量的代码在处理 handler,下面画一张图,来描述一下 handler 是如何从 ServerBootstrap 加入到 pipeline 中的。
还有最后一点代码,把 bind0()
方法看完。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 private ChannelFuture doBind (final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null ) { return regFuture; } if (regFuture.isDone()) { ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { final AbstractBootstrap.PendingRegistrationPromise promise = new AbstractBootstrap.PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete (ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null ) { promise.setFailure(cause); } else { promise.registered(); doBind0(regFuture, channel, localAddress, promise); } } }); return promise; } } private static void doBind0 ( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { channel.eventLoop().execute(() -> { if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } }); } @Override public ChannelFuture bind (SocketAddress localAddress, ChannelPromise promise) { return pipeline.bind(localAddress, promise); } @Override public final ChannelFuture bind (SocketAddress localAddress, ChannelPromise promise) { return tail.bind(localAddress, promise); } @Override public ChannelFuture bind (final SocketAddress localAddress, final ChannelPromise promise) { ObjectUtil.checkNotNull(localAddress, "localAddress" ); if (isNotValidPromise(promise, false )) { return promise; } final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeBind(localAddress, promise); } else { safeExecute(executor, () -> { next.invokeBind(localAddress, promise); }, promise, null , false ); } return promise; } private void invokeBind (SocketAddress localAddress, ChannelPromise promise) { if (invokeHandler()) { try { ((ChannelOutboundHandler) handler()).bind(this , localAddress, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } } else { bind(localAddress, promise); } } @Override public void bind (ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) { unsafe.bind(localAddress, promise); }
就写到这里吧。