Spring Webflux 入门
真是混沌的一个月呀,不管第几周啦,就当做六月的最后一个周吧。
这个周入门 Spring Webflux,归纳学习响应式编程,但只从两个最常用的两个类:Mono 和 Flux 入手,关注一下基本使用。
响应式编程(Reactive programming)是一种不好形容的编程模式,常见的描述语或定义语中会出现:异步、非阻塞、传递变化、支持背压。我觉得自己的理解还不到位,决定本篇不定义或是阐述响应式编程是什么,只入个门,之后回来补这部分功课。
抛开定义,仅从代码的形式上来看,响应式编程跟 JDK 8 中常见的 stream 流,在使用上很接近,例如下面这段响应式代码:
1 | Mono.just(jsonParam) |
我在工作中使用的响应式编程框架是 Spring Webflux,它基于 Reactor 3 实现(由 Spring 母公司 Pivotal 的某团队开发),论辈分可能是第四代,或者是第五代。
响应式编程的历史渊源有点复杂,简单来说,最早是微软一帮人在 2010 年写出了 Rx.NET,运行在 .NET 平台上,后来在各个平台上都做了类似的实现,例如 RxJs、RxJava等(Rx 是 ReactiveX 的缩写),其中 RxJava 就是在 Java 平台上的实现。后来出现了 Project Reactor、Akka-Streams 之类的响应式框架。在 2013 年末,Netflix、Pivotal、Typesafe 等公司开始牵头制定响应式流规范,并在 2015 年正式发布了 Reactive Streams 规范,这个规范直接表现在 JDK 9 JUC 包中的 Flow API。如今的响应式框架基本都实现了 Reactive Streams 规范,业界主流的是 RxJava 2 和 Reactor 3。
响应式编程的历史还可以参考这篇文章《Advanced Reactive Java》(偶然搜到的,不知是何方神圣)。
本篇学习的内容是 Reactor 中的 Mono 和 Flux 两个类的使用,我也懒得介绍这两个类了,下面的内容会很干。
Mono
创建
方法 | 示例 | 描述 | 备注 |
---|---|---|---|
just | Mono.just(user)… | 把元素加入流中 | 元素不能是null,否则抛空指针 |
justOrEmpty | Mono.justOrEmpty(user)… | 把元素(可能为null)加入到流中 | 元素如果是null,返回Mono.empty() |
Mono.justOrEmpty(optional)… | 把元素(Optional对象)加入到流中 | 元素如果是空,返回Mono.empty() | |
defer | Mono.defer(() -> Mono.error(new RuntimeException(“懒异常”)))… | 把Mono元素加入流中,但是延时加载,直到有subscribe时才会加载元素 | 可以理解为懒加载,当subscribe时才加载元素(每次subscribe都会重新加载)。 例如有一个用法是,当需要定义异常Mono.error(),并不需要每次都创建,而是在出现异常时才加载,可以节省内存 |
deferWithContext | Mono.deferWithContext(context -> Mono.just(“test”)))… | 与defer()方法几乎完全相同,只是接受参数变成了Function,可以拿到流的context | context来自reactor.util.context包,代表流的上下文,用法不明,待后续 |
fromCallable | return Mono.fromCallable(() -> { Thread.sleep(1000 * 3); return “休眠了三秒”; })… | 通过Callable获取元素加入到流中,是支持阻塞的一种实现 | 有很多类似的fromXXX()方法,例如fromFuture()、fromRunnable()等 |
empty | Mono.empty()… | 创建流,但流中没有任何元素 | 后续如果有flatmap()、map()等,都不会执行,但then()等可以执行,可以用于忽略后续数据 |
never | Mono.never(); | 永不停止,不搭理任何数据,就此睡死 | |
error | Mono.error(new RuntimeException(“失败”))… | 将一个异常加入流中,并立即停止后续操作 | 参数是Throwable对象,error立即加载,无论是否用到 |
Mono.error(() -> new RuntimeException(“失败”)) | 将一个异常加入流中,并立即停止后续操作 | 参数是Supplier函数,error懒加载 | |
Mono.delay(Duration.ofSeconds(3))… | 上来就延时 | 延时后返回0 |
转换
方法 | 示例 | 描述 | 备注 |
---|---|---|---|
flatMap | …flatMap(t -> { return Mono.just(t); })… |
将一个Mono转换成另一个Mono(异步) | 参数是一个Function(接收Mono中的值,返回一个新的Mono) |
map | …map(t -> { return t; })… |
将Mono中的内容进行更改 | 参数是一个Function(接收Mono中的值,返回另一个值) |
cast | Mono.just(map) .cast(HashMap.class)… | 将Mono中的对象,从A类转换成B类 | 等价于map(clazz::cast) |
sequenceEqual | Mono.sequenceEqual(monoA, monoB)… | 比较两个Publisher(Mono、Flux)是否相同 | 返回的是一个Mono<Boolean> |
Mono.sequenceEqual(monoA, monoB, (a, b) -> a.length() == b.length())… | 比较两个Publisher(Mono、Flux)是否相同(可以指定条件) | ||
zip | Mono.zip(mono1, mono2, mono3)… | 将多个Mono合并成一个Mono(内部是Tuple对象),可以通过tuple.getT1()等类似方式取出一个个内容 | 如果其中一个Mono发生异常,逻辑同抛出Mono.error(),如果有多个Mono发生异常,将按顺序取第一个异常 |
zipDelayError | Mono.zipDelayError(mono1, mono2, mono3)… | 功能上同zip(),但在处理异常的时候逻辑不同 | 如果多个Mono发生异常,将组合成一个新的异常 |
其他
方法 | 示例 | 描述 | 备注 |
---|---|---|---|
retry | Mono.just(10/0).retry(10)… | 遇到异常重试 | 从头执行,而不是只执行上一步 有很多变种,参数总体与异常类型和重试次数有关 |
repeat | Mono.just(“.”).repeat(10)… | 重复(在非异常情况下),会从Mono变成重复元素的Flux | 有很多变种,并且跟retry一样,都不算最开始的一次 |
Flux
跟 Mono 相同的就不写了,下面的内容全都是 Flux 独有的
创建
方法 | 示例 | 描述 | 备注 |
---|---|---|---|
fromIterable | Flux.fromIterable(list)… | 从一个迭代器对象(实现Iterable接口的对象)中,把所有元素加入Flux中 | 类似的还有fromArray、fromStream |
range | Flux.range(20, 5)… | 把一系列数字加入Flux中,这些数字间距为1 | 例如示例中加入Flux中的数字是20、21、22、23、24,从20开始,一共5个 |
转换
方法 | 示例 | 描述 | 备注 |
---|---|---|---|
index | Flux.fromIterable(list).index()… | 给Flux内的多个元素加下标 | 例如从[“a”,”b”,”c”]变成[{0:”a”},{1:”b”},{2:”c”}],转换后每个T变成一个二元元祖Tuple2<Long, T> |
startWith | .startWith(list)… | 将某些内容塞到Flux的最前面 | 比如原Flux里有[1,2,3],新插入[8,9],然后Flux变成[8,9,1,2,3] |
.startWith(mono)… | 同上,接收一个Publisher对象 | ||
.startWith(t1, t2, t3)… | 同上,接收若干个和Flux中其他元素相同的对象 | ||
concatWith | .concatWith(mono)… | 将某些内容塞到Flux的最后面 | 与startWith的左右相反,但只接收Publisher对象 |
take | Flux.fromIterable(list).take(10)… | 只取前面几个元素,后面的舍弃 | 有很多种变种,例如前N个、某段时间之前、某条件没改变之前等 |
collect | Flux.fromIterable(list).collect(Collectors.toList())… | 把Flux中的多个元素封装成一个Mono集合 | 使用起来跟JDK 8的Stream一个逻辑,例如 list.stream().filter(num -> num > 10).collect(Collectors.toList()); |
collectList | Flux.fromIterable(list).collectList()… | 把Flux中的多个元素封装成一个Mono列表 | 例如从Flux<String> 转变成Mono<List<String>> ,还有很多类似的collect()方法 |
window | Flux.just(1,2,3,4,5,6,7,8,9).window(3)… | 把一个Flux拆成多个Flux,拆后的Flux的元素个数是window内的个数 | 例如左边示例代码,会把[1,2,3,4,5,6,7,8,9]转换成[1,2,3]、[4,5,6]、[7,8,9] |
我发现 Mono 和 Flux 的 Api 实在是太多太多了……我都已经整理三周了,还是没有整理完,不能再拖下去了hhh
本篇文章先写到这里,之后用到哪个新的 Api 就回来接着补充。