响应式编程

探讨篇

起源

技术发展趋势

背景最近几年,随着GoNode 等新语言、新技术的出现,Java 作为服务器端开发语言老大的地位受到了不小的挑战。虽然Java 的市场地位在短时间内并不会发生改变,但Java 社区还是将挑战视为机遇,并努力、不断地提高自身应对高并发服务器端开发场景的能力

  • 2009 年,微软提出了一个更优雅地实现异步编程的方式—— Reactive Programming ,我们称之为响应式编程
  • JavaScript 语言就在ES6 中通过Promise 机制引入了类似的异步编程方式
  • 2017 年9 月28 日,Spring 5 正式发布。Spring 5 发布最大的意义在于,它将响应式编程技术的普及向前推进了一大步。而同时,作为在背后支持Spring 5 响应式编程的框架Spring Reactor

是什么?

面向流 变化传播 异步编程范式

响应式编程是一种面向数据流和变化传播的异步编程范式。这意味着可以在编程语言中很方便地表达静态或动态的数据流,而相关的计算模型会自动将变化的值通过数据流进行传播

– 百度百科

  • :官方的解释是一组有序的数据发布者。所谓流,就是数据的流动、传输。通过各种形式将数据从一个地方带到另一个地方。一次Http请求是流,将请求参数带到服务端。一次数据库查询也是流,提交查询参数返回查询结果…
    • 静态数据流:Java代码中的一个数组、一个List、一次数据库查询结果。它们的特点是数据是固定的,有限的
    • 动态数据流:一个商城的页面埋点就是一个动态数据流(无限流),你永远不知道数据什么时候到来,什么时候结束
  • 变化传播:一个值改变之后,会像多米诺骨牌一样,导致直接和间接引用它的值均发生相应变化

有什么特点?

  • 异步编程:提供了合适的异步编程模型,能够挖掘多核 CPU 的能力、提高效率、降低延迟和阻塞等
  • 数据流:基于数据流模型,响应式编程提供一套统一的 Stream 风格的数据处理接口
  • 变化传播:简单来说就是以一个数据流为输入,经过一连串操作转化为另一个数据流,然后分发给各个订阅者的过程

适用场景

以更少的服务器资源承载更高并发的场景

技术路线

  • ProjectReactor是 Spring 响应式编程的基石,ProjectReactor项目由vmwarePivotal两家公司合作研发
  • Spring Webflux 底层的响应式流实现就依赖于ProjectReactor
  • Spring Cloud Gateway 需要 Spring Boot 和 Spring Webflux 提供的 Netty 运行。它在传统的 Servlet 容器中或构建为 WAR 时不起作用

概念篇 ProjecReactor

官方参考文档地址

中文文档地址(3.2.0)

中文文档源码

reactive-streams 概念

响应式编程思想

响应式编程,就像装配一条流水线。Publisher 规定了数据如何生产,中间会有 Operators(操作符)对流水线的数据进行解析,校验,转换等等操作,最终处理好的数据流转到 Subscriber

发布订阅流程

Publisher 发布者

1
2
3
public  interface  Publisher < T > {
public void subscribe(Subscriber <? super T > s);
}
  • Publisher是一个或无限发布者元素的发布者,根据从其Subscriber接收到的需求发布它们
  • 一个Publisher可以在不同的时间点动态地服务于多个Subscriber的订阅
  • 在被订阅之前,大部分Publisher并不会主动发布元素(冷与热),通过 订阅,可以将 PublisherSubscriber 进行绑定,从而触发整个链中的数据流动。这是在内部实现的,通过单个 request 信号从 Subscriber 传播到上游,一直传回到 Publisher

Mono:一个可以发出[0,1]个元素的异步发布者

Flux:一个可以发出[1,无穷]个元素的异步发布者

Subscriber 订阅者

1
2
3
4
5
6
public  interface  Subscriber < T > {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
  • 在将Subscriber的实例传递给Publisher.subscribe(Subscriber)后,将收到一次对onSubscribe(Subscription)的调用

  • 在调用Subscription.request(long)之前不会收到进一步的通知

  • 单次调用onError(Throwable)或onComplete()发出终止状态信号,之后将不会发送更多事件(订阅终止)

  • 只要Subscriber实例能够处理更多元素,就可以通过Subscription.request(long)发出需求信号(long 是背压数)

BaseSubscriber:如果需要自定义订阅者,推荐继承这个类

Subscription 订阅

1
2
3
4
public interface Subscription {
public void request(long n);
public void cancel();
}
  • 表示Subscriber消费Publisher发布的一个消息的生命周期
  • 它只能由单个Subscriber使用一次
  • 它用于表示对数据的需求和取消需求(并允许资源清理)

Processor 处理器

1
2
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
  • Processor代表一个处理阶段,它既是Subscriber又是Publisher并且遵守两者的契约
  • 也就是说它是一个 订阅元素-发布元素-处理的处理过程(3.5版本开始废弃,改用Sinks

Mono

Mono 使用手册

单元测试

一个可以发出[0,1]个元素的发布者

简单发布订阅

1
2
3
4
5
6
7
@Test
public void test() {
Mono<String> source = Mono.just("test") // 定义发布数据的逻辑
.map(String::toUpperCase); // 操作符,处理发布的数据

source.subscribe(System.out::println); // 订阅数据
}

Flux

单元测试

一个可以发出[1,无穷]个元素的发布者

简单发布订阅

1
2
3
4
5
6
7
@Test
public void test() {
Flux<String> source = Flux.just("test") // 定义发布数据的逻辑
.map(String::toUpperCase); // 操作符,处理发布的数据

source.subscribe(System.out::println); // 订阅数据
}

Disposable

  • 所有基于 lambda 的subscribe()都有一个 Disposable 返回类型。在这种情况下,Disposable 接口表示可以通过调用其 dispose() 方法取消订阅
  • 对于 Flux 或 Mono,dispose() 是源停止生成元素的信号。但是,它不能保证马上取消,某些源可能会非常快地生成元素,因为在接收到取消指令之前源已完成

发布元素

  • just:热发布运算符,在定义后,数据即刻发布(冷与热)

  • generate:同步单线程且每次回调只能发布一个元素

    1
    2
    3
    4
    5
    6
    7
    Flux<String> flux = Flux.generate(
    () -> 0,
    (state, sink) -> { // SynchronousSink
    sink.next("3 x " + state + " = " + 3*state);
    if (state == 10) sink.complete();
    return state + 1;
    });
  • create:异步多线程发布元素

    1
    2
    3
    4
    5
    6
    7
    Flux.create(fluxSink -> {
    for (int i = 0; i < 5; i++) {
    fluxSink.next(i); // 发布元素
    }

    fluxSink.complete(); // 完成发布
    });
  • push:异步单线程发布元素

    1
    2
    3
    4
    Flux.push(fluxSink -> {
    fluxSink.next("test");
    fluxSink.complete();
    });

背压

向上游传播信号也用作实现 背压,我们在组装流水线类比中将其描述为,当工作站的处理速度比上游工作站慢时,沿生产线向上发送反馈信号。背压其实就是一种下游向上游传递信号控制上游元素发布速率的一种手段,目的是均衡上下游的生产消费速率,保证稳定。(相当于MQ的拉取模式)

背压策略

  • IGNORE: 完全忽略下游背压请求,这可能会在下游队列积满的时候导致 IllegalStateException
  • ERROR: 当下游跟不上节奏的时候发出一个 IllegalStateException 的错误信号
  • DROP: 当下游没有准备好接收新的元素的时候抛弃这个元素
  • LATEST: 让下游只得到上游最新的元素
  • BUFFER: (默认的)缓存所有下游没有来得及处理的元素(这个不限大小的缓存可能导致 OutOfMemoryError)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
@Test
public void test() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
Flux<Long> flux = Flux.<Long>create(sink -> {
for (long i = 0; i < 100; i++) {
sink.next(i);
try {
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
sink.complete();
}, FluxSink.OverflowStrategy.LATEST)
.publishOn(Schedulers.newSingle("newSingle"), 1);

flux.subscribe(new BaseSubscriber<Long>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
// 订阅时设置每次请求元素个数
request(1);
}

@Override
protected void hookOnNext(Long value) {
log.info("消费流数据:{}", value);

try {
TimeUnit.MILLISECONDS.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}

// 每处理完1个数据,就再请求1个
request(1);
}

@Override
protected void hookOnError(Throwable throwable) {
log.error(throwable.getMessage(), throwable);
}

@Override
protected void hookOnComplete() {
countDownLatch.countDown();
}
});
countDownLatch.await();
}

冷与热

  • 的发布者为每个订阅者重新生成数据。如果没有创建订阅,则永远不会生成数据。例如,如果源包装了一个 HTTP 调用,则会为每个订阅发出一个新的 HTTP 请求(在订阅前什么都不会发生)
  • 的发布者不会为每个订阅者从头开始。相反,迟到的订阅者会收到他们订阅后发出的信号。但是请注意,一些 反应流可以缓存或重播全部或部分发布的元素。从一般的角度来看,发布者甚至可以在没有订阅者监听时发布(打破在订阅前什么都不会发生,可能立即发布元素,即使没有订阅者)
  • just就是为数不多的热发布者运算符,它在组装时直接捕获值,然后将其重播给任何订阅它的人。再次使用 HTTP 调用类比,如果捕获的数据是 HTTP 调用的结果,那么在实例化时只进行一次网络调用
  • defer可以将热发布者转变为冷发布者,它将我们示例中的 HTTP 请求推迟到订阅时触发(并且会导致对每个新订阅进行单独的网络调用)
  • share() replay(… )可用于将冷发布者变成热发布者(至少在第一次订阅发生后)

冷发布者重播行为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
    @Test
public void clodTest() {
Flux<String> source = Flux.fromIterable(Arrays.asList("blue", "green", "orange", "purple"))
.map(String::toUpperCase);

source.subscribe(d -> System.out.println("Subscriber 1: "+d));
source.subscribe(d -> System.out.println("Subscriber 2: "+d));
}
// Subscriber 1: BLUE
// Subscriber 1: GREEN
// Subscriber 1: ORANGE
// Subscriber 1: PURPLE

// Subscriber 2: BLUE
// Subscriber 2: GREEN
// Subscriber 2: ORANGE
// Subscriber 2: PURPLE

热发布者广播行为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
    @Test
public void hotTest() {
DirectProcessor<String> hotSource = DirectProcessor.create();
Flux<String> hotFlux = hotSource.map(String::toUpperCase);

// 第一个订阅者
hotFlux.subscribe(d -> System.out.println("Subscriber 1 to Hot Source: "+d));
hotSource.onNext("blue");
hotSource.onNext("green");

// 第二个订阅者
hotFlux.subscribe(d -> System.out.println("Subscriber 2 to Hot Source: "+d));
hotSource.onNext("orange");
hotSource.onNext("purple");
hotSource.onComplete();
}
// Subscriber 1 to Hot Source: BLUE
// Subscriber 1 to Hot Source: GREEN
// Subscriber 1 to Hot Source: ORANGE
// Subscriber 2 to Hot Source: ORANGE
// Subscriber 1 to Hot Source: PURPLE
// Subscriber 2 to Hot Source: PURPLE

调度器

要用什么线程池由用户自己决定,只提供配置线程池的方法

元素的发布和订阅可以使用publishOnsubscribeOn方法指定使用的调度线程

Reactor, 就像 RxJava,也可以被认为是 并发无关(concurrency agnostic) 的。意思就是, 它并不强制要求任何并发模型。更进一步,它将选择权交给开发者。不过,它还是提供了一些方便 进行并发执行的库

  • 使用当前线程Schedulers.immediate()
  • 可重用的单线程Schedulers.single()。注意,这个方法对所有调用者都提供同一个线程来使用
  • 专用单线程Schedulers.newSingle()。如果想为每个调用都使用一个单独的线程执行则使用这个
  • 弹性线程池Schedulers.elastic()。它根据需要创建一个线程池,重用空闲线程。线程池如果空闲时间过长 (默认为 60s)就会被废弃,没有线程数上限(新版本已废弃)
  • 有界弹性线程池Schedulers.boundedElastic()。可以设置最大线程数和最大任务数的弹性线程池
  • 固定大小线程池(Schedulers.parallel())。所创建线程池的大小与 CPU 个数等同
  • 使用JDK或第三方的线程池 Schedulers.fromExecutorService(ExecutorService)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* Flux.create 异步多线程发布
*/
@Test
public void createTest() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(100);
Scheduler scheduler = Schedulers.boundedElastic();

Flux.create(fluxSink -> {
for (int i = 0; i < 10; i++) {
ThreadUtil.sleep(100, TimeUnit.MILLISECONDS);
log.info("发布元素");
fluxSink.next(i);
}
})
.publishOn(scheduler) // 指定发布的线程池
.subscribeOn(scheduler) // 指定订阅的线程池
.subscribe(data -> {
log.info("订阅元素:{}", data);
countDownLatch.countDown();
});

countDownLatch.await();
}

错误处理

在响应式流中,错误(error)是终止(terminal)事件。当有错误发生时,它会导致流序列停止, 并且错误信号会沿着操作链条向下传递,直至遇到你定义的 Subscriber 及其 onError 方法

Reactor 提供了其他的用于在链中处理错误的方法,即错误处理操作(error-handling operators)

静态缺省值

onErrorReturn:当Publisher出现错误时,返回一个默认值

1
2
3
4
5
6
7
8
9
10
@Test
public void test() {
Mono<Integer> source = Mono.just(10)
.map(item -> item / 0) // 除零错误
.onErrorReturn(0); // 异常返回默认值,有重载方法,可根据异常类型,条件决定返回值

StepVerifier.create(source)
.expectNext(0)
.verifyComplete();
}

捕获并吃掉错误

onErrorComplete:吃掉Error信号,只需将 onError 信号替换为 onComplete 信号即可完成序列

1
2
3
4
5
6
7
8
9
10
11
12
13
@Test
public void test() {
Mono source = Mono.just(1)
.map(item -> {
throw new RuntimeException();
})
.onErrorComplete();

source.subscribe(AssertSubscriber.create()
.assertNoError()
.assertNoValues()
.assertNotComplete());
}

异常处理方法

onErrorResume:当Publisher出现错误时,进行方法回调

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Test
public void test() {
Mono<String> source = Mono.just("")
.map(item -> {
if (null == item || "".equals(item)) {
throw new NullPointerException();
}

return item;
})
.onErrorResume(this::getDefault);

StepVerifier.create(source)
.expectNext("default")
.verifyComplete();
}

/**
* 异常回调方法
* @param throwable
* @return
*/
private Mono<String> getDefault(Throwable throwable) {
return Mono.just("default");
}

捕获并重新抛出

onErrorMap:捕获异常,并重新转换一个新的异常然后继续向下传递

1
2
3
4
5
6
7
8
9
10
11
12
@Test
public void test() {
Mono source = Mono.just(1)
.map(item -> item / 0)
.onErrorMap(e -> {
return new RuntimeException("error");
});

StepVerifier.create(source)
.expectError(RuntimeException.class)
.verify();
}

异常终止触发行为

doOnError:添加Mono因异常而终止时触发的行为,首先处理程序被执行,然后 onError 信号被传播到下游

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Test
public void test() {
Mono source = Mono.just(1)
.map(item -> {
throw new RuntimeException();
})
.doOnError(throwable -> {
// TODO 异常业务处理
throwable.printStackTrace();
});

StepVerifier.create(source)
.expectError()
.verify();
}

使用Finally

doFinally:Mono因任何原因终止后添加行为触发(包括取消、终止、完成),会传递一个信号量类型通知你

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Test
public void test() {
Mono source = Mono.just(1)
.map(item -> {
throw new RuntimeException();
})
.doFinally(signalType -> {
if (SignalType.ON_ERROR.equals(signalType)) {
System.out.println("异常信号");
}
});

StepVerifier.create(source)
.expectError()
.verify();
}

try-with-resource

using:主要将一个资源使用工厂方法方式为每个订阅者生成资源,第一个参数在订阅时创建资源,第二个参数 一个Mono工厂创建 Mono, 第三个参数 资源清理方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Test
public void test() {
Mono<Object> source = Mono.using(
() -> 1,
resource -> Mono.error(new RuntimeException()),
item -> {
// 类似 finally,完成或异常后会执行
System.out.println("释放资源");
});

StepVerifier.create(source)
.expectError(RuntimeException.class)
.verify();
}

Exceptions

全局 Reactor 核心异常处理和操作工具,使用工具类传播和拆包异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Test
public void test() {
Mono<Integer> source = Mono.just(1)
.map(item -> {
if (item == 1) {
// 包装传播一个异常
throw Exceptions.propagate(new IllegalArgumentException());
}

return item;
});

source.subscribe(System.out::print, e -> {
// 订阅时处理传播的异常
if (Exceptions.unwrap(e) instanceof IllegalArgumentException) {
System.out.println("参数非法异常");
}
});
}

Sinks

在3.5.0版本之前使用的是ProcessorSinks在新版本之后用于淘汰Processor

在 Reactor 中,sink 是一个允许以独立方式安全手动触发信号的类,创建一个类似于 Publisher 的结构,能够处理多个 Subscriber(单播类型除外)

Sinks.emitNext: 发布元素,EmitFailureHandler,发布失败处理程序

Sinks.Many 终止:(通常通过调用其 emitError(Throwable) 或 emitComplete() 方法),它会允许更多订阅者订阅,但会立即向他们重播终止信号

Sinks.UnicastSpec 单播规则

Sinks.ManySpec 多播规则

Sinks.MulticastSpec 多播规则

Sinks.MulticastReplaySpec 多播重播规则

多线程环安全类

  • Sinks.One
  • Sinks.Many

Sinks 建造者为主要支持的生产者类型提供了一个引导式 API。您会发现 Flux 中的一些行为,例如 onBackpressureBuffer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Test
public void test() {
ExecutorService executorService = Executors.newFixedThreadPool(5);

// 定义一个sinks
Sinks.Many<Integer> sinks = Sinks.many().replay().all();

// 多线程手动调用发布元素
executorService.execute(() -> sinks.emitNext(1, Sinks.EmitFailureHandler.FAIL_FAST));
executorService.execute(() -> sinks.emitNext(2, Sinks.EmitFailureHandler.FAIL_FAST));
executorService.execute(() -> sinks.emitNext(3, Sinks.EmitFailureHandler.FAIL_FAST));

// Sinks.Many 可以转为Flux。同理,Sinks.One可以转为Mono
Flux<Integer> flux = sinks.asFlux().log();

// 多线程发布元素,因此输出的顺序不一定是1,2,3
flux.subscribe(System.out::println);
}

单播

仅允许一个订阅者订阅,多个订阅者订阅除了第一个订阅之外,其他订阅者会出现IllegalStateException异常

1
2
3
4
5
6
7
8
9
@Test
public void test() {
Sinks.Many<Integer> sinks = Sinks.many().unicast().onBackpressureBuffer();
sinks.emitNext(1, Sinks.EmitFailureHandler.FAIL_FAST);

Flux<Integer> flux = sinks.asFlux();
flux.subscribe(AssertSubscriber.create().assertNoError().assertNotComplete());
flux.subscribe(AssertSubscriber.create().assertError(IllegalStateException.class).assertNotComplete());
}

多播

它只会将新推送的数据传输给它的多个订阅者,同时为每个订阅者提供背压

1
2
3
4
5
6
7
8
9
@Test
public void test() {
Sinks.Many<Integer> sinks = Sinks.many().multicast().onBackpressureBuffer();
sinks.emitNext(1, Sinks.EmitFailureHandler.FAIL_FAST);

Flux<Integer> flux = sinks.asFlux();
flux.subscribe(AssertSubscriber.create().assertNoError().assertNotComplete());
flux.subscribe(AssertSubscriber.create().assertNoError().assertNotComplete());
}

重播

它将向多个Subscriber广播,并能够保留和重播所有历史的元素

1
2
3
4
5
6
7
8
9
@Test
public void test() {
Sinks.Many<Integer> sinks = Sinks.many().replay().all();
sinks.emitNext(1, Sinks.EmitFailureHandler.FAIL_FAST);

Flux<Integer> flux = sinks.asFlux();
flux.subscribe(AssertSubscriber.create().assertNoError().assertNotComplete());
flux.subscribe(AssertSubscriber.create().assertNoError().assertNotComplete());
}

Debug调试

在命令模式开发中,错误代码根据异常堆栈是相对容易定位到的。但是在响应式编程中,由于Publish封装了其操作符,并且调用链可能会很长,根据异常堆栈定位问题相对困难

1
2
3
4
5
6
7
8
9
@Test
public void test3() {
Mono<String> source = Flux.just("hello", "world")
.filter(item -> null != item)
.map(item -> item.toLowerCase())
.single();

Disposable subscribe = source.subscribe();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Caused by: java.lang.IndexOutOfBoundsException: Source emitted more than one item
at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:134)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.onNext(FluxFilterFuseable.java:118)
at reactor.core.publisher.FluxArray$ArrayConditionalSubscription.fastPath(FluxArray.java:340)
at reactor.core.publisher.FluxArray$ArrayConditionalSubscription.request(FluxArray.java:263)
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.request(FluxFilterFuseable.java:191)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171)
at reactor.core.publisher.MonoSingle$SingleSubscriber.doOnRequest(MonoSingle.java:103)
at reactor.core.publisher.Operators$MonoInnerProducerBase.request(Operators.java:2841)
at reactor.core.publisher.LambdaMonoSubscriber.onSubscribe(LambdaMonoSubscriber.java:121)
at reactor.core.publisher.MonoSingle$SingleSubscriber.onSubscribe(MonoSingle.java:115)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96)
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.onSubscribe(FluxFilterFuseable.java:87)
at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:50)
at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59)
at reactor.core.publisher.Mono.subscribe(Mono.java:4716)
at reactor.core.publisher.Mono.subscribeWith(Mono.java:4784)
at reactor.core.publisher.Mono.subscribe(Mono.java:4544)
at reactor.core.publisherMonoBlockTest.test3(MonoBlockTest.java:66)

从异常堆栈中我们得到一个IndexOutOfBoundsException,它告诉我们这个源发布了太多的元素。我们只能猜测是由于Flux发布了多个元素最终调用single方法收敛为一个Mono时元素过多,但是在这个堆栈中我们提取不到任何有价值的信息,这显然对我们进行调试很不友好

激活调试模式

尽管堆栈跟踪仍然能够提取到一些有价值的信息,但我们可以看到,在更高级的情况下,它本身并不理想。幸运的是,Reactor 带有专为调试而设计的汇编时检测,这是通过在应用程序启动时通过 Hooks.onOperatorDebug() 方法激活全局调试模式来完成的(或者至少在可以实例化有异常的 Flux 或 Mono 之前)

通过包装操作符的构造并在那里捕获堆栈跟踪来检测对 Reactor 操作符方法的调用(它们被组装到链中)。由于这是在声明运算符链时完成的,因此应该在此之前激活Hook,因此最安全的方法是在应用程序开始时立即激活它

现在继续沿用上面的那段异常代码,我们加入激活调试模式再看看异常堆栈

1
2
3
4
5
6
7
8
9
10
@Test
public void test3() {
Hooks.onOperatorDebug();
Mono<String> source = Flux.just("hello", "world")
.filter(item -> null != item)
.map(item -> item.toLowerCase())
.single(); // 代码第65行

Disposable subscribe = source.subscribe();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IndexOutOfBoundsException: Source emitted more than one item
Caused by: java.lang.IndexOutOfBoundsException: Source emitted more than one item
at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:134)⑴
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: ⑵
Assembly trace from producer [reactor.core.publisher.MonoSingle] :⑶
reactor.core.publisher.Flux.single(Flux.java:8203)
reactor.core.publisher.MonoBlockTest.test3(MonoBlockTest.java:65)
Error has been observed at the following site(s):⑷
*__Flux.single ⇢ at reactor.core.publisher.MonoBlockTest.test3(MonoBlockTest.java:65)⑸
Original Stack Trace:⑹
at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:134)⑺
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.onNext(FluxFilterFuseable.java:118)
  • ⑴:原始堆栈跟踪到MonoSingle,也就是说我们的元素被截断
  • ⑵:我们看到了捕获堆栈的包装器运算符。这是回溯开始出现的地方
  • ⑶:我们得到一些关于操作位置的细节MonoSingle
  • ⑷:我们得到了错误传播的运算符链的观点,从头到尾(错误站点到订阅站点)
  • ⑸:提到了错误的每个操作以及使用它的用户类和所在行数,这里我们有一个“根”
  • ⑹:跟踪堆栈的其余部分
  • ⑺:展示了一些操作的内部结构

最终根据“根”的提示和分析,我们定位到了single()方法出现异常

checkpoint

调试模式是全局的,会影响应用程序中组装到 Flux 或 Mono 中的每个运算符

正如我们之前看到的,这种全局知识是以影响性能为代价的(由于填充的堆栈跟踪的数量)。如果我们知道可能存在问题的操作,则可以降低该成本。但是,我们通常不知道哪些操作符可能有问题,除非复现BUG调试

可以将checkpoint()运算符链接到方法链中。检查点运算符的工作方式类似于Hook,但仅适用于该特定的方法链

1
2
3
4
5
6
7
8
9
10
11
@Test
public void test3() {
Mono<String> source = Flux.just("hello", "world")
.filter(item -> null != item)
.map(item -> item.toLowerCase())
.checkpoint("1")
.single()
.checkpoint("2");

Disposable subscribe = source.subscribe();
}

我们可以在一个方法链中插入一些检查点,最终这些检查点会协助我们定位到出现问题的代码,上面代码插入2个检查点,最终在异常堆栈中只输出检查点2,因此我们可以定位问题介于检查点1和2之间

1
2
3
4
5
6
7
8
9
10
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IndexOutOfBoundsException: Source emitted more than one item
Caused by: java.lang.IndexOutOfBoundsException: Source emitted more than one item
at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:134)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
*__checkpoint ⇢ 2
Original Stack Trace:
at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:134)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.onNext(FluxFilterFuseable.java:118)

现成的全局调试方案

Project Reactor 附带一个单独的 Java 代理,它可以检测您的代码并添加调试信息,而无需在每次操作调用时捕获堆栈跟踪。该行为与激活调试模式非常相似 - 也称为回溯,但没有运行时性能开销

  1. 它需要使用Maven进行依赖集成

    1
    2
    3
    4
    <dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-tools</artifactId>
    </dependency>
  2. 它还需要显式初始化

    1
    ReactorDebugAgent.init();

    由于该实现将在加载类时对其进行检测,因此最好将其放置在 main(String[]) 方法中的其他所有内容之前

    1
    2
    3
    4
    public static void main(String[] args) {
    ReactorDebugAgent.init();
    SpringApplication.run(Application.class, args);
    }

    ReactorDebugAgent 作为 Java Agent 实现,并使用 ByteBuddy 进行字节码扩展。可能无法在某些 JVM 上工作,请参阅 ByteBuddy 的文档以获取更多详细信息

    如果当前开发环境不支持 ByteBuddy 字节码扩展,可以将 reactor-tools 作为 Java Agent 运行

    1
    java -javaagent reactor-tools.jar -jar app.jar

修改代码

1
2
3
4
5
6
7
8
9
10
@Test
public void test() {
ReactorDebugAgent.init();
Mono<String> source = Flux.just("hello", "world")
.filter(item -> null != item)
.map(item -> item.toLowerCase())
.single();

Disposable subscribe = source.subscribe();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Test class reactor.tools.agent.ReactorDebugJavaAgentTest
[ERROR] (Test worker) Operator called default onErrorDropped - reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IndexOutOfBoundsException: Source emitted more than one item
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IndexOutOfBoundsException: Source emitted more than one item
Caused by: java.lang.IndexOutOfBoundsException: Source emitted more than one item
at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:134)
Suppressed: The stacktrace has been enhanced by Reactor, refer to additional information below:
Assembly trace from producer [reactor.core.publisher.MonoSingle] :
reactor.core.publisher.Flux.single
reactor.tools.agent.ReactorDebugJavaAgentTest.test(ReactorDebugJavaAgentTest.java:62)
Error has been observed at the following site(s):
*__Flux.single ⇢ at reactor.tools.agent.ReactorDebugJavaAgentTest.test(ReactorDebugJavaAgentTest.java:62) // 根
Original Stack Trace:
at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:134)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.onNext(FluxFilterFuseable.java:118)

日志记录

日志操作使用 Loggers 类,它通过 SLF4J 选择常见的日志记录框架,例如 Log4J 和 Logback,如果 SLF4J 不可用,则默认记录到控制台

在生产环境中使用,应该注意配置底层日志记录框架以使用其最异步和非阻塞的方法——例如,Logback 中的 AsyncAppender 或 Log4j 2 中的 AsyncLogge

1
2
3
4
5
6
7
8
9
10
@Test
public void test3() {
Mono<String> source = Flux.just("hello", "world")
.log()
.filter(item -> null != item)
.map(item -> item.toLowerCase())
.single();

Disposable subscribe = source.subscribe();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
Test class reactor.core.publisher.MonoBlockTest
00:08:19.834 [Test worker] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArrayConditionalSubscription)
00:08:19.846 [Test worker] INFO reactor.Flux.Array.1 - | request(unbounded)
00:08:19.847 [Test worker] INFO reactor.Flux.Array.1 - | onNext(hello)
00:08:19.848 [Test worker] INFO reactor.Flux.Array.1 - | onNext(world)
00:08:19.848 [Test worker] INFO reactor.Flux.Array.1 - | cancel() // 在获取第二个元素后流被取消
00:08:19.860 [Test worker] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IndexOutOfBoundsException: Source emitted more than one item
Caused by: java.lang.IndexOutOfBoundsException: Source emitted more than one item
at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:134)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.onNext(FluxFilterFuseable.java:118)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableConditionalSubscriber.onNext(FluxPeekFuseable.java:503)

通过log日志定位后,我们能够定位到当在获取第二个元素时流被取消,结合异常信息和源码,我们知道在调用single()方法时Flux存在多个元素

性能指标监控

TODO(依赖未发布到Maven仓库)

高级特性

高级特性

上下文传播

3.5.0版本之后内置了Context-Propagation第三方上下文传播库,能够实现ThreadLocal->Context的数据传播

数据库篇

R2DBC

响应式的关系型数据库驱动规范

  • 全称是Reactive Relational Database Connectivity 响应式关系型数据库连接

  • JDBC一样,R2DBC是使用响应式编程实现的关系型数据库驱动规范,是非阻塞型的。而JDBC则是传统编程实现的阻塞型驱动规范

  • 注意:如果将传统项目升级为响应式项目,那么最大的阻力就是驱动切换,只有将Spring MVC 升级为webflux并且将JDBC启动替换为R2DBC才是一个响应式项目。如果单升级webflux的关系型数据库项目将毫无意义,不能发挥响应式编程的特性

已经实现R2DBC驱动的数据库

已经实现或在调研中的ORM客户端

spring-data-r2dbc

spring-data-r2dbc是spring-data项目的一部分(大名顶顶的spring-data项目几乎支持了市面上主流的数据库),实现了R2DBCJPA协议,是一个响应式的关系型数据库的ORM框架,可以很轻松使用JPA规范去编写响应式数据库操作(JPA规范比较有名的实现是Hibernate)

reactive-mybatis

基于mybatis-r2dbc改造实现的响应式版mybatis,实现了ORM框架基本功能,目前无法使用Mybatis插件功能

实践篇

WebFlux

官方参考文档地址

国人翻译wiki

国人翻译

背景

项目

ruoyi-webflux-r2dbc-vue3

若依是一套全部开源的快速开发平台,毫无保留给个人及企业免费使用。

  • 这是 RuoYi WebFlux + R2DBC + Vue3 的实现。
  • 前端技术栈: Vue3 + Element Plus + Vite 是分支于 https://github.com/yangzongzhuan/RuoYi-Vue3 (版本: 3.8.2)。
  • 后端采用 Spring Boot、Spring Security、Redis & Jwt。
  • 后端采用 Spring Boot WebFlux、R2DBC、MyBatis、MyBatis-R2DBC、Mysql。
  • 权限认证使用 Jwt,支持多终端认证系统。
  • 支持加载动态权限菜单,多方式轻松权限控制。
  • 高效率开发,使用代码生成器可以一键生成前后端代码。

内置功能

  1. 用户管理:用户是系统操作者,该功能主要完成系统用户配置。
  2. 部门管理:配置系统组织机构(公司、部门、小组),树结构展现支持数据权限。
  3. 岗位管理:配置系统用户所属担任职务。
  4. 菜单管理:配置系统菜单,操作权限,按钮权限标识等。
  5. 角色管理:角色菜单权限分配、设置角色按机构进行数据范围权限划分。
  6. 字典管理:对系统中经常使用的一些较为固定的数据进行维护。
  7. 参数管理:对系统动态配置常用参数。
  8. 通知公告:系统通知公告信息发布维护。
  9. 操作日志:系统正常操作日志记录和查询;系统异常信息日志记录和查询。
  10. 登录日志:系统登录日志记录查询包含登录异常。
  11. 在线用户:当前系统中活跃用户状态监控。
  12. 定时任务:在线(添加、修改、删除)任务调度包含执行结果日志。
  13. 代码生成:前后端代码的生成(java、html、xml、sql)支持 CRUD 下载 。
  14. 系统接口:根据业务代码自动生成相关的 api 接口文档。
  15. 服务监控:监视当前系统 CPU、内存、磁盘、堆栈等相关信息。
  16. 缓存监控:对系统的缓存信息查询,命令统计等。
  17. 在线构建器:拖动表单元素生成相应的 HTML 代码。
  18. 连接池监视:不支持此功能。

微服务

Reactive Feign


响应式编程
https://wugengfeng.cn/2022/11/12/响应式编程/
作者
wugengfeng
发布于
2022年11月12日
许可协议