Netty 的 Bootstrap


没有想到,整整一个九月,就看了 Netty 的一个方法。

九月份学习完 ServerBootstrap,下个月就先不看 Netty 了,换一换脑子。


在上一篇博文中,我们初识了 Netty,知道了它是基于 NIO 的异步非阻塞 I/O 框架,它能做很多事情但它也有很多类。

Netty 进行 I/O 操作是靠 NIO 的 Channel 类实现的,不过 Netty 自己也有一个 Channel 类,它内部包着 NIO 的 Channel(有点绕)。Netty 的 Channel 类有特别多的变量,它几乎包含着 I/O 操作所需要的全部东西,因此创建一个 Channel 并设置参数,是一件相当复杂的事。Netty 考虑到这一点,提供了 Bootstrap 类,这个类就是为创建 Channel 而生的,它的作用就是方便快捷地创建 Channel。

再看一遍这句解释,体会一下 Bootstrap 的作用:

AbstractBootstrap is a helper class that makes it easy to bootstrap a Channel.

上面所说的 AbstractBootstrap 是一切 Netty 中 Bootstrap 的父类,我们今天要学习的是两种 Bootstrap:ServerBootstrapBootstrap。前者 ServerBootstrap 用来创建服务端 Channel,后者 Bootstrap 用来创建客户端 Channel。

回顾一下服务端和客户端。(粗浅地讲)服务端提供中心服务功能,暴露出来一个端口,用户那边的客户端,可以通过刚才那个端口连接到服务端。服务端和客户端通过同一个端口进行 I/O 操作,服务端绑定(bind)端口,客户端连接(connect)端口,两边都连好了,就可以通信了。

ServerBootstrap 和 Bootstrap 可以各自通过 bind(…) 和 connect(…) 方法,生成一个 ChannelFuture 实例,这个 ChannelFuture 实例就代表着 I/O 操作的结果。

1
2
3
4
// serverBootstrap 绑定端口 8080
ChannelFuture serverFuture = serverBootstrap.bind(8080);
// bootstrap 连接端口 127.0.0.1:8080
ChannelFuture clientFuture = bootstrap.connect("127.0.0.1", 8080);

在 serverBootstrap 执行 bind(…) 方法时,它将隐含地创建 Netty 的 Channel 实例,并把自己的参数都设置到 Channel 身上,这也就是我们刚才说的,Bootstrap 的作用就是方便快捷地创建 Channel 实例(实际上我们从始至终都没看到 Channel 实例)。对于 bootstrap 也是一样,它执行 connect(…) 方法时也会隐含地创建一个 Channel 实例,设置好参数,连接端口,最后返回一个 ChannelFuture。

重新梳理一遍:

  • ServerBootstrap 和 Bootstrap 各自用来快速创建服务端的 Channel客户端的 Channel,并用这个 Channel 来进行 I/O 操作(这个过程对外是隐藏的)。
  • ServerBootstrap 对象执行 bind(…) 方法,绑定端口,这个过程中会隐含地创建服务端的 Channel
  • Bootstrap 对象执行 connect(…) 方法,连接端口,这个过程中会隐含地创建客户端的 Channel

如果到这里不能理解,那么接下去的内容应该也不用看了。

下面先不提 bind(…) 和 connect(…) 方法,先开始讲 ServerBootstrap 和 Bootstrap 是怎么创建的,又是怎么设置复杂的参数的。


Netty_Bootstrap

ServerBootstrap

服务端 Channel 的引导类。

虽说是“方便快捷”地创建 Channel,但其实代码还挺多的。下面是示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 创建 ServerBootstrap
ServerBootstrap serverBootstrap = new ServerBootstrap();
// 配置 ServerBootstrap
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new DiscardServerHandler());
}
})
.handler(new LoggingHandler(LogLevel.INFO))
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);

ServerBootstrap 继承自 AbstractBootstrap,两个类的实例变量加在一起有 11 个,基本都有用处。

ServerBootstrap初始化

最开始的代码就是为了设置这些变量,ServerBootstrap 实例调了 6 个方法设置好了 7 个参数,设置的过程可以参照下图:

ServerBootstrap设置

简单地过一遍这些变量,都是用来做什么的:

变量 作用
channelFactory ServerBootstrap 是服务端 Channel 的引导类,它是用来快速创建并配置 Channel 的,而创建 Channel 就是通过这个变量,利用工厂方法创建的
group 这是一个线程池(boss),它的作用是连接,在绑定端口之后接收连接,每接到一个连接,指派给 childGroup 去做 I/O 操作
childGroup 这也是一个线程池(worker),它的作用是 I/O,也就是负责具体的 I/O 操作,一个线程负责一个 Channel 的 I/O
handler 设置 ChannelHandler,用于处理服务端 Channel 的发送
childHandler 设置 ChannelHandler,用于处理客户端 Channel 的接收
option 设置服务端 Channel 的选项参数
childOption 设置客户端 Channel 的选项参数
config 其实就是 Bootstrap 对象自身(this),作用是在父类的方法中,也能获得本对象的一些参数

写累了,看代码吧。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
/* —————————————————————————— group —————————————————————————— */
// 设置【parentGroup】和【childGroup】

/**
* ServerBootstrap # group
*/
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup");
return this;
}

/**
* AbstractBootstrap # group
* 父类的 group(...) 方法
*/
public B group(EventLoopGroup group) {
ObjectUtil.checkNotNull(group, "group");
if (this.group != null) {
throw new IllegalStateException("group set already");
}
this.group = group;
return self();
}


/* —————————————————————————— channel —————————————————————————— */
// 设置【channelFactory】
// 传入的是 Channel 子类,取它的构造方法,封装成 channelFactory,以后用它创建 channel

/**
* ServerBootstrap # channel
*/
public B channel(Class<? extends C> channelClass) {
// ReflectiveChannelFactory 类的构造方法一直往下看
return channelFactory(new ReflectiveChannelFactory<C>(
ObjectUtil.checkNotNull(channelClass, "channelClass")
));
}

/**
* ServerBootstrap # channel
*/
public B channelFactory(io.netty.channel.ChannelFactory<? extends C> channelFactory) {
return channelFactory((ChannelFactory<C>) channelFactory);
}

/**
* ServerBootstrap # channel
*/
public B channelFactory(ChannelFactory<? extends C> channelFactory) {
ObjectUtil.checkNotNull(channelFactory, "channelFactory");
if (this.channelFactory != null) {
throw new IllegalStateException("channelFactory set already");
}
this.channelFactory = channelFactory;
return self();
}

/**
* 这个类继承自 ChannelFactory
* 通过反射获取指定 Channel 类的构造方法,以后用 constructor 生产 channel
*/
public ReflectiveChannelFactory(Class<? extends T> clazz) {
ObjectUtil.checkNotNull(clazz, "clazz");
try {
this.constructor = clazz.getConstructor();
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +
" does not have a public non-arg constructor", e);
}
}


/* —————————————————————————— childHandler —————————————————————————— */
// 设置【childHandler】
// 这里传入的是 ChannelInitializer,它是 ChannelHandler 的子类,相当于一个大的 handler,里面套着很多小 handler

/**
* ServerBootstrap # childHandler
*/
public ServerBootstrap childHandler(ChannelHandler childHandler) {
this.childHandler = ObjectUtil.checkNotNull(childHandler, "childHandler");
return this;
}


/* —————————————————————————— handler —————————————————————————— */
// 设置【handler】

/**
* AbstractBootstrap # handler
* 父类实现
*/
public B handler(ChannelHandler handler) {
this.handler = ObjectUtil.checkNotNull(handler, "handler");
return self();
}


/* —————————————————————————— option —————————————————————————— */
// 设置【options】
// 在 options(一张 LinkedHashMap)中增加 [option - value] 键值对

/**
* AbstractBootstrap # option
* 父类实现
*/
public <T> B option(ChannelOption<T> option, T value) {
ObjectUtil.checkNotNull(option, "option");
synchronized (options) {
if (value == null) {
options.remove(option);
} else {
options.put(option, value);
}
}
return self();
}


/* —————————————————————— childOptions —————————————————————— */
// 设置【childOptions】
// 在 childOptions(一张 LinkedHashMap)中增加 [option - value] 键值对

/**
* ServerBootstrap # childOption
*/
public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value) {
ObjectUtil.checkNotNull(childOption, "childOption");
synchronized (childOptions) {
if (value == null) {
childOptions.remove(childOption);
} else {
childOptions.put(childOption, value);
}
}
return this;
}

Bootstrap

客户端 Channel 的引导类。

Bootstrap 相比 ServerBootstrap,只有减少没有增加,简单看一个示例就可以了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 创建 Bootstrap
Bootstrap bootstrap = new Bootstrap();
// 配置 Bootstrap
bootstrap.group(workerGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LoggingHandler(LogLevel.INFO));
pipeline.addLast(new EchoServerHandler());
}
});

应该能看出一点套路了。

下图来自《Netty 源码解析系列》,是 ServerBootstrap 和 Bootstrap 的使用套路,再感受一下。

bootstrap配置套路


接下来,我们只关注一行代码:

1
ChannelFuture future = serverBootstrap.bind(port).sync();

ServerBootstrap 的 bind(port) 方法的作用是创建一个 Channel 对象,并绑定上一个端口,它返回的是 ChannelFuture(更准确地讲,是 DefaultChannelPromise),也就是说在执行 bind(port) 方法之后,会获取一个 Future 对象,bind 的结果将异步设置进 Future 对象中。

sync() 方法其实只是在等待,一直等到异步结果返回为止,也就是说,执行完这行代码,channel 已经绑定上端口号了。因此 sync() 并不是重点,我们重点要看的是 bind(port) 方法。下面的所有内容,都是在学习 bind(port) 方法究竟做了什么,很长。


把结果放在最前面,后续代码太多,只写注释好了。

bind(port) 方法主要做了这些事情:

  1. 创建 Channel 对象
  2. 初始化 channel(尤其是初始化 pipeline)
  3. 注册 channel
    1. 创建 JDK 底层的 Channel 对象并注册 selectionKey
    2. pipeline 上 handler 执行回调事件
    3. pipeline 上 handler 执行注册事件(registered)
    4. pipeline 上 handler 执行启动事件(active)
  4. channel 绑定端口号

开始看代码吧:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
/**
* AbstractBootstrap # bind
*/
public ChannelFuture bind(int inetPort) {
// 将端口号 port 转换成 InetSocketAddress(java.net 的类)
// 就不往里挖了,最后会转换成 [0.0.0.0/0.0.0.0:port]
return bind(new InetSocketAddress(inetPort));
}

/**
* AbstractBootstrap # bind
*/
public ChannelFuture bind(SocketAddress localAddress) {
// 校验 group 和 channelFactory 是否不为空,如果为空就抛异常
validate();
return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
}

/**
* AbstractBootstrap # doBind
*
* 这是所有逻辑的核心中转站
* 1.init
* 2.register
* 3.bind
*/
private ChannelFuture doBind(final SocketAddress localAddress) {
// 创建 channel,初始化,注册,返回注册结果
final ChannelFuture regFuture = initAndRegister();

final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}

// channel 注册完成,直接调用 doBind0(...) 方法
if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// channel 没注册完成,加一个 listener,等它完成自己去调用 doBind(...) 方法
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
promise.setFailure(cause);
} else {
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}

接下来有一个非常庞大的方法:initAndRegister()

这个方法拆成 initregister 两部分来看,先看 init 的部分。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
/**
* AbstractBootstrap # initAndRegister
*
* 1.创建 Channel 实例
* 2.初始化 channel(设置 config、attr、pipeline、pipeline#handler)
* 3.注册 channel(很复杂)
*
* 没关注的问题:
* Channel 的构造方法
*/
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 创建 channel 实例
// 内部是根据,之前通过反射获取的构造函数,创建 Channel 对象的,详见 ServerBootstrap#channel(...)
channel = channelFactory.newChannel();

// 子类实现
// 1. 设置了 channel 的 config、attr
// 2. 把 ServerBootstrap 的 handler 加入 pipeline 中
init(channel);

} catch (Throwable t) {
// 异常暂且不管了
if (channel != null) {
channel.unsafe().closeForcibly();
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}

// 取出 bossGroup 线程池,注册 channel
// 回忆一下,这个线程池的作用是连接客户端,然后分配给 workerGroup 执行
// 这一步执行完,channel 就完全可用了
ChannelFuture regFuture = config().group().register(channel);

// ChannelFuture 的 cause 变量是异常信息,这里是在处理异常
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}

// 返回的是 channel 注册的结果(DefaultChannelPromise)
return regFuture;
}

/**
* ServerBootstrap # init
*
* 1.设置 channel 的 config 和 attr
* 2.往 pipeline 中加入 handler
*/
@Override
void init(Channel channel) {
// 设置 AbstractBootstrap 父类的参数
// 把 option 设置到 channel 的 config 里、把 attr 设置到 channel 的 attr 里
setChannelOptions(channel, newOptionsArray(), logger);
setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));

// 获取刚刚创建的 Channel 对象的 pipeline
// 现在它只有 head 和 tail 两个 handler 节点,下面将在中间添加一个新节点
ChannelPipeline p = channel.pipeline();

// 获取了一些 ServerBootstrap 子类的成员变量
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);
}
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);

// 在刚刚创建的 Channel 对象的 pipeline 中增加一个 handler
p.addLast(new ChannelInitializer<Channel>() {

// 这个方法之后会调到,传进来的 channel 就是刚才的 channel(此时它已经 JDK 底层注册 selector,并且有 eventLoop 了)
@Override
public void initChannel(final Channel ch) {
// 拿到 channel 的 pipeline
final ChannelPipeline pipeline = ch.pipeline();

// 把最初 ServerBootstrap 中设置的 handler 参数,加到 pipeline 中
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}

// 这是 channel 未来将有的 eventLoop,channel 执行的所有异步任务都交给它做
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
// 往 pipeline 中加入了一个 ServerBootstrapAcceptor,这个类继承自 ChannelInboundHandlerAdapter
// 这个 handler 的作用是接收客户端的请求
pipeline.addLast(new ServerBootstrap.ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}

/* —————————————————————————— bind -pipeline#addLast —————————————————————————— */

/**
* DefaultChannelPipeline # addLast
*/
@Override
public final ChannelPipeline addLast(ChannelHandler... handlers) {
// 第一个参数是 executor(线程池)
// 如果是 null,之后会默认使用 channel 绑定的 eventLoop
// 如果不是 null,可以指定一个 EventExecutorGroup(避免在 ChannelHandler 中使用阻塞操作)
return addLast(null, handlers);
}

/**
* DefaultChannelPipeline # addLast
*/
@Override
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
ObjectUtil.checkNotNull(handlers, "handlers");

// 对多个 handler 逐个执行 addLast(...) 方法
// 目前只有一个 handler,就是 ServerBootstrap 的 handler(此处是 LoggingHandler 对象)
for (ChannelHandler h: handlers) {
if (h == null) {
break;
}
addLast(executor, null, h);
}

return this;
}

/**
* DefaultChannelPipeline # addLast
*
* 1.将 handler 加到 pipeline 链表中除了 tail 节点外的最后一个
* 2.执行 handler 加入链表成功的回调事件(不一定直接能执行)
*/
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
// 检查 handler 是否重复添加、是否可以重复添加
checkMultiplicity(handler);

// 将 handler 包装成 handlerContext(带 pipeline 上下文的 handler)
// 后面不使用 handler,转而使用包装后的 handlerContext,以便 handler 能拿到 pipeline
newCtx = newContext(group, filterName(name, handler), handler);

// 将包装过的 handler 加入 pipeline 链表当中
// 实现就是 [xxx]<->[tail] 变成 [xxx]<->[handlerContext]<->[tail]
addLast0(newCtx);


// —————————————————————————————————————————————————————
// handler 加入 pipeline 之后,会执行 handler 的回调事件,比如 ChannelInitializer 就会把自己删除
// 下面是在执行 handler 加入后的回调事件(回调事件是提交给线程池,异步执行的)
// 下面有三条路,不论那条最后都会执行 callHandlerAdded0(...) 方法,也就是执行回调事件


// registered 实际是指 channel 有没有被注册,现在还是 false,以后会看到它被赋值为 true
// 当 channel 还没有被注册的时候,channel 绑定的 eventLoop 可能还没有被创建,即可能还没有线程池
// 因此不能执行 handler 的回调事件,那么就先把这个 handler 记录下来,等以后注册完了再执行回调
if (!registered) {
// 设置 handler 的状态为 ADD_PENDING,等待执行回调
newCtx.setAddPending();
// 记录下来这个 handler,等以后 channel 注册完了再执行它的回调事件
callHandlerCallbackLater(newCtx, true);
return this;
}

// 执行到这里代表 channel 被注册过,那么可以让线程池执行 handler 的回调事件了

// 当前线程不是 eventLoop 线程,那么提交给 eventLoop 线程完成
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
// 这个方法就执行了两行代码,handler 设置状态为 ADD_PENDING,然后异步提交 callHandlerAdded0(...) 方法
// newCtx.setAddPending();
// executor.execute(() -> { callHandlerAdded0(newCtx); });
callHandlerAddedInEventLoop(newCtx, executor);
return this;
}
}

// 当前线程是 eventLoop 线程,直接执行
callHandlerAdded0(newCtx);
return this;
}

/**
* DefaultChannelPipeline # callHandlerCallbackLater
*
* 此时 channel 还没有注册
* 保存 handlerContext,包装成 PendingHandlerCallback,存储在 pipeline 的 pendingHandlerCallbackHead 变量中
*/
private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
// 断言:channel 没有注册过(如果注册过,不需要来保存 handler,等待执行回调事件)
assert !registered;

// 将 handlerContext 包装成 PendingHandlerAddedTask
// 这个类实现了 Runnable 接口,执行它的 execute() 方法,就可以执行 handler 的回调事件
// 在后续代码,channel 注册之后,将执行 task.execute(),它内部会执行 callHandlerAdded0(...) 方法

// added 为 true 表示 HandlerAdd 任务,false 表示 HandlerRemove 任务
DefaultChannelPipeline.PendingHandlerCallback task = added ? new DefaultChannelPipeline.PendingHandlerAddedTask(ctx) : new DefaultChannelPipeline.PendingHandlerRemovedTask(ctx);

// pipeline 有一个成员变量:pendingHandlerCallbackHead
// 这个变量存储的是【需要存储下来,等待执行回调事件的 handler】的链表头结点
// 等后续 channel 注册之后,获取 pipeline 的这个变量,逐个触发全部 handler 的回调事件
DefaultChannelPipeline.PendingHandlerCallback pending = pendingHandlerCallbackHead;
if (pending == null) {
pendingHandlerCallbackHead = task;
} else {
while (pending.next != null) {
pending = pending.next;
}
pending.next = task;
}
}

/**
* PendingHandlerAddedTask
*
* 后续代码将执行该类的 execute() 方法,内部在执行 callHandlerAdded0(...) 方法
*/
private final class PendingHandlerAddedTask extends DefaultChannelPipeline.PendingHandlerCallback {
PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) {
// 父类的构造方法:this.ctx = ctx;
super(ctx);
}
@Override
public void run() {
callHandlerAdded0(ctx);
}
@Override
void execute() {
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
callHandlerAdded0(ctx);
} else {
try {
executor.execute(this);
} catch (RejectedExecutionException e) {
if (logger.isWarnEnabled()) {
logger.warn("Can't invoke handlerAdded() as the EventExecutor {} rejected it, removing handler {}.", executor, ctx.name(), e);
}
atomicRemoveFromHandlerList(ctx);
ctx.setRemoved();
}
}
}
}

上面看完了 initAndRegister() 方法的 init 部分,接下来看 register 部分。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
/**
* MultithreadEventLoopGroup # register
*
* 获取一个 eventLoop,执行它的 register(...) 方法
* 辗转后,实际上是在 channel.unsafe() 中执行的 register(...) 方法
*/
@Override
public ChannelFuture register(Channel channel) {
// 获取一个 EventLoop 线程
// 具体不看了,最后返回 executors[idx.getAndIncrement() & executors.length - 1]
return next()
// 注册,往下看
.register(channel);
}

/**
* SingleThreadEventLoop # register
*/
@Override
public ChannelFuture register(Channel channel) {
// 注册,往下看
return register(
// 这个构造方法不看了,就俩赋值
// this.channel = channel
// this.executor = executor(就是 EventLoop 线程)
new DefaultChannelPromise(channel, this)
);
}

/**
* SingleThreadEventLoop # register
*/
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");

// channel 注册,往下看
promise.channel().unsafe().register(this, promise);

// 注册的结果会放入 promise 中(异步),返回这个结果
return promise;
}

/**
* AbstractChannel.AbstractUnsafe # register
*
* 1.channel 设置 eventLoop 线程
* 2.eventLoop 执行 register0() 方法
*/
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
ObjectUtil.checkNotNull(eventLoop, "eventLoop");

// 如果以前注册过,promise 设置失败
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
// 判断 eventLoop 线程类型是否是兼容的,如果不是 NioEventLoop 对象,promise 设置失败
if (!isCompatible(eventLoop)) {
promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}

// 从此之后,channel 就是有 eventLoop 线程的了
// channel 的所有异步操作,都将使用这个 eventLoop 完成
AbstractChannel.this.eventLoop = eventLoop;

// 如果发起 register 的线程就是 eventLoop 里的线程,直接执行 register0()(这里是不会的,以后 unregister 才会)
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
// eventLoop 去执行 register0() 方法
eventLoop.execute(() -> { register0(promise); });
} catch (Throwable t) {
// 异常暂且忽略
logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}

/**
* AbstractChannel.AbstractUnsafe # register0
*/
private void register0(ChannelPromise promise) {
try {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;

// JDK 底层注册,就执行了一行代码
// selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
// 三个参数分别代表:【JDK 底层的 selector】【对所有事件都不感兴趣】【本 channel,这样以后就可以通过 selector 获取 channel】
doRegister();

neverRegistered = false;
registered = true;

// 将 ChannelInitializer 内部添加的 handlers 添加到 pipeline 中
// 我们接下来相当长的部分,都是在看这个方法
// 这个方法执行完,pipeline 有关服务端的部分就已经全部完成了
pipeline.invokeHandlerAddedIfNeeded();

// 设置 promise 注册完成
safeSetSuccess(promise);

// 此时 channel 已经被注册,通知所有关心这件事的 handler,让它们处理这件事
// fire 这个词很形象,可以翻译成”传火“,让 pipeline 从 head 节点开始,一个个地去处理
// 每一个订阅该事件的 handler 都会执行 channelRegistered(...) 方法,执行完之后找到下一个订阅的节点,让它再去执行
// 这是一个递归的过程,总之就是让所有订阅 channel 注册事件的 handler,都去执行 channelRegistered(...) 方法
// 比如 LoggingHandler 就会在 channelRegistered(...) 方法中执行【logger.log(internalLevel, format(ctx, "REGISTERED"));】
// 这块的代码也比较复杂,不往里看了
pipeline.fireChannelRegistered();

// 如果 JDK 的 channel 已经打开
if (isActive()) {
// 如果首次注册(是的)
if (firstRegistration) {
// pipeline file 所有的 handler 响应 ChannelActive 事件(就是按顺序执行所有 handler 重写的 channelActive(...) 方法)
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// channel 之前就被注册过了,现在立即去监听通道内的 OP_READ 事件
beginRead();
}
}
} catch (Throwable t) {
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}

/**
* DefaultChannelPipeline # invokeHandlerAddedIfNeeded
*/
final void invokeHandlerAddedIfNeeded() {
// 断言:当前线程就是 channel 绑定的线程
assert channel.eventLoop().inEventLoop();
if (firstRegistration) {
firstRegistration = false;
// 在初始化 channel 设置 pipeline 时,我们将 ServerBootstrap 的 handler 变量保存起来
// 因为当时 channel 还没有注册,还没有绑定 eventLoop,无法执行 handler 的回调事件
// 现在可以执行 handler 的回调事件了
callHandlerAddedForAllHandlers();
}
}

/**
* DefaultChannelPipeline # callHandlerAddedForAllHandlers
*/
private void callHandlerAddedForAllHandlers() {
final DefaultChannelPipeline.PendingHandlerCallback pendingHandlerCallbackHead;
synchronized (this) {
assert !registered;

// 这是 pipeline 的变量,是指 channel 是否已经注册,可以使用 channel 的 eventLoop 了
// 这个变量我们已经在 pipeline 的 addLast() 方法中见过了,当时是 false,从现在开始变为 true
registered = true;

// 取出当时 addLast(handler) 的 handler,当时没有执行 handler 的回调事件,现在准备来执行
pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;

this.pendingHandlerCallbackHead = null;
}

// pipeline 之前存储了一些没有执行回调事件的 handler,他们排成了一个链表,pipeline 存储了链表头
// 现在逐个执行这些 handler 的回调事件
DefaultChannelPipeline.PendingHandlerCallback task = pendingHandlerCallbackHead;
while (task != null) {
// 执行 handler 的回调事件
// 回去看 pipeline 的 addLast(...) 的代码,会发现这行代码实际上在执行 callHandlerAdded0(...)
task.execute();
task = task.next;
}
}

// 我们到这里回忆一下
// 在之前初始化 channel 时,设置了 pipeline,将 ServerBootstrap 的 handler 包装起来加入到 pipeline 链表中
//
// p.addLast(new ChannelInitializer<Channel>() {
// @Override
// public void initChannel(final Channel ch) {
// ...
// pipeline.addLast(handler);
// ...
// }
// });
//
// 也就是说,pipeline 加入了一个 handler(一个 ChannelInitializer 实例),它里面包着 ServerBootstrap 的 handler
// pipeline 加入了这个 handler 之后,要执行回调事件 callHandlerAdded0(...)
//
// 但是回调事件只能在 channel 绑定的 eventLoop 中执行,此时还没有绑定 eventLoop
// 因此只好先存储一下 handler,等待之后 channel 注册完成,绑定了 eventLoop 后再执行 handler 的回调事件
//
// 当时将 handler 又包装了一层,包装成了 PendingHandlerCallback 实例
// 这是一个实现了 Runnable 接口的类,handler 的回调事件,被放进了 Runnable 的 execute() 方法中
// 这里的 task 实际上就是当初包装后的 PendingHandlerCallback
// 这里执行 task.execute() 方法,实际上就是在执行 handler 的回调事件

/**
* DefaultChannelPipeline # callHandlerAdded0
*
* (省略了一个方法)task.execute() -> callHandlerAdded0(ctx)
*/
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
try {
// 这里的 ctx 就是 handlerContext,之前被包装过的 handler
// 执行它的回调事件
ctx.callHandlerAdded();
} catch (Throwable t) {
// 异常不去看了
boolean removed = false;
try {
atomicRemoveFromHandlerList(ctx);
ctx.callHandlerRemoved();
removed = true;
} catch (Throwable t2) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to remove a handler: " + ctx.name(), t2);
}
}
if (removed) {
fireExceptionCaught(new ChannelPipelineException(ctx.handler().getClass().getName() + ".handlerAdded() has thrown an exception; removed.", t));
} else {
fireExceptionCaught(new ChannelPipelineException(ctx.handler().getClass().getName() + ".handlerAdded() has thrown an exception; also failed to remove.", t));
}
}
}

/**
* AbstractChannelHandlerContext # callHandlerAdded
*/
final void callHandlerAdded() throws Exception {
// 先设置 handlerContext 的状态是 ADD_COMPLETE
if (setAddComplete()) {
// 执行内部包着的 handler 的回调事件
handler().handlerAdded(this);
}
}

/**
* ChannelInitializer # handlerAdded
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// 确保 channel 已经被注册
if (ctx.channel().isRegistered()) {
// 执行 initChannel(...) 方法,往里看,柳暗花明
if (initChannel(ctx)) {
// 这里并不是在 pipeline 中删除自己,只是标记一下状态
removeState(ctx);
}
}
}

/**
* ChannelInitializer # initChannel
*/
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
// 防止加载多次,先记录下来这个(避免并发)
if (initMap.add(ctx)) {
try {
// 这个方法我们之前使用过
// ChannelInitializer 是一个抽象类,每次实例化必须要实现它的 initChannel(...) 方法
// 我们之前创建 ChannelInitializer 对象的时候,就重写过这个方法了
initChannel((C) ctx.channel());
} catch (Throwable cause) {
// 异常处理不看了
exceptionCaught(ctx, cause);
} finally {
// 在 pipeline 中删掉 ChannelInitializer 对象,它的作用已经结束了
ChannelPipeline pipeline = ctx.pipeline();
if (pipeline.context(this) != null) {
pipeline.remove(this);
}

// 至此,我们的 pipeline 有关服务端的部分已经完成了
// 它的内部是:head <-> ServerBootstrap 的 handler <-> 用于接收客户端请求的 handler <-> tail
}
return true;
}
return false;
}

/**
* ServerBootstrap # init
*
* 回想一下这个方法
* bind() -> doBind() -> initAndRegister() -> init()
* 此时 channel 刚刚创建,正在初始化它,这个方法就是在初始化,设置 channel 的 pipeline
*/
@Override
void init(Channel channel) {
// 设置 channel 的 options 和 attrs 参数
setChannelOptions(channel, newOptionsArray(), logger);
setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));

// 取出一堆东西来,准备初始化 pipeline
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Map.Entry<ChannelOption<?>, Object>[] currentChildOptions;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);
}
final Map.Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);

// 在 pipeline 中加入一个 ChannelInitializer,这里面包着 ServerBootstrap 的 handler
// 创建 ChannelInitializer 对象,必须要重写它的 initChannel 方法
p.addLast(new ChannelInitializer<Channel>() {

// 我们终于看到这个方法被执行了
// 回忆一下:
// 1. 我们在初始化 channel 时,设置 pipeline,往里面加入了一个 ChannelInitializer 对象,里面装着 handler
// 执行 pipeline.addLast(...) 方法时,将这个被包装的 handler 加入到 pipeline 的尾部
// 2. 每一个 handler 加入 pipeline 之后,都要执行回调事件(对于 ChannelInitializer,是把它内部 handler 们加入到 pipeline 中,再删除掉自己)
// 但是回调事件是需要异步完成的,需要 channel 绑定的 eventLoop 线程来执行,但是此时 channel 还没有被注册
// 因此先把这个 handler 存储下来,先包装成 handlerContext,再包装成 pendingHandlerCallback,以链表的形式存储在 pipeline 中
// 3. 当 channel 被注册之后,channel 有了 eventLoop,可以执行 handler 的回调事件了
// 此时取出 pipeline 的 pendingHandlerCallback,取出里面的 handlerContext
// 给 eventLoop 提交一个任务,让它去执行 handler 的回调事件,ChannelInitializer 的回调事件是执行 initChannel(...),最终一步步走到了这里
@Override
public void initChannel(final Channel ch) {

// 取出 pipeline
final ChannelPipeline pipeline = ch.pipeline();

// 取出 ServerBootstrap 的 handler
ChannelHandler handler = config.handler();

// 将 handler 加入到 pipeline 中,这个 handler 的回调事件可以直接执行
if (handler != null) {
pipeline.addLast(handler);
}

// 这里还往 pipeline 中加入了一个 handler
// 这个 handler 的作用是接收客户端请求
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrap.ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}

上面有大量的代码在处理 handler,下面画一张图,来描述一下 handler 是如何从 ServerBootstrap 加入到 pipeline 中的。

netty_pipeline

还有最后一点代码,把 bind0() 方法看完。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
/**
* AbstractBootstrap # doBind
*
* 很早之前的方法了,最开始的时候
* 此时 channel 已经创建并注册好了,要执行绑定端口的操作了
*/
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}

// 此时 channel 已经完成初始化的代码(异步执行,可能还没完成)

// 如果已经完成注册,在当前线程执行 doBind0(...) 方法
if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
}
// 如果没有完成注册,给注册 future 增加一个完成后的回调,让它完成后去调用 doBind0(...) 方法
else {
// 此时没有注册完成,channel 还不一定有绑定的 eventLoop,不能让 channel 生成 promise
// 因此创建一个 PendingRegistrationPromise 对象
final AbstractBootstrap.PendingRegistrationPromise promise = new AbstractBootstrap.PendingRegistrationPromise(channel);

// 怎么加监听,以及完成后处理监听的逻辑就暂且忽略了
regFuture.addListener(new ChannelFutureListener() {
/**
* 完成后的回调方法 operationComplete
*/
@Override
public void operationComplete(ChannelFuture future) throws Exception {
// 如果 channel 注册失败,就记录失败
Throwable cause = future.cause();
if (cause != null) {
promise.setFailure(cause);
}
// 如果 channel 注册成功,执行 doBind0(...)
else {
// 记录注册成功
promise.registered();
// 执行 doBind0(...) 方法
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}

/**
* AbstractBootstrap # doBind0
*/
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {

channel.eventLoop().execute(() -> {
if (regFuture.isSuccess()) {
// 在 channel 注册成功的前提下,绑定端口
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
});
}

/**
* AbstractChannel # bind
*/
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}

/**
* DefaultChannelPipeline # bind
*/
@Override
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return tail.bind(localAddress, promise);
}

/**
* AbstractChannelHandlerContext # bind
*/
@Override
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
ObjectUtil.checkNotNull(localAddress, "localAddress");

// 检查一下 promise 的类型等要素,如果有问题直接返回
if (isNotValidPromise(promise, false)) {
return promise;
}

// 获取下一个对订阅【绑定事件】的 outbound handler
final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);

// 获取 handler 归属 channel 的 eventLoop,执行 invokeBind(...) 方法
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
// 直接加载 invokeBind(...) 方法
next.invokeBind(localAddress, promise);
} else {
// eventLoop 懒加载 invokeBind(...) 方法
safeExecute(executor, () -> { next.invokeBind(localAddress, promise); }, promise, null, false);
}
return promise;
}

/**
* AbstractChannelHandlerContext # invokeBind
*/
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
// 检测该 handler 是否加入到 pipeline 中
if (invokeHandler()) {
try {
// 该 handler 执行绑定事件
// 每个 handler 的执行内容都不相同,比如 LoggingHandler 就是打印 ”BIND“
// 但是每个 handler 执行完之后,都会再次调用 AbstractChannelHandlerContext#bind() 方法,继续执行下去
// 执行到最后,会执行到 head 节点,执行它的 bind(...) 方法,往下看
((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
} else {
// 如果该 handler 还没有初始化完成,跳过它,执行下一个 handler 的绑定
bind(localAddress, promise);
}
}

/**
* DefaultChannelPipeline$HeadContext # bind
*
* head handler 的 bind(...) 方法
* 真正的绑定端口
*/
@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
// 使用 unsafe 类来执行 bind(...) 方法
// 底层是通过 JDK 的 channel 来实现绑定端口的,不看了
unsafe.bind(localAddress, promise);
}

就写到这里吧。