T
- the type of the single value of this classpublic abstract class Mono<T> extends java.lang.Object implements CorePublisher<T>
Publisher
with basic rx operators that emits at most one item via the
onNext
signal then terminates with an onComplete
signal (successful Mono,
with or without value), or only emits a single onError
signal (failed Mono).
Most Mono implementations are expected to immediately call Subscriber.onComplete()
after having called Subscriber#onNext(T)
. Mono.never()
is an outlier: it doesn't
emit any signal, which is not technically forbidden although not terribly useful outside
of tests. On the other hand, a combination of onNext
and onError
is explicitly forbidden.
The recommended way to learn about the Mono
API and discover new operators is
through the reference documentation, rather than through this javadoc (as opposed to
learning more about individual operators). See the
"which operator do I need?" appendix.
The rx operators will offer aliases for input Mono
type to preserve the "at most one"
property of the resulting Mono
. For instance flatMap
returns a
Mono
, while there is a flatMapMany
alias with possibly more than
1 emission.
Mono<Void>
should be used for Publisher
that just completes without any value.
It is intended to be used in implementations and return types, input parameters should keep
using raw Publisher
as much as possible.
Note that using state in the java.util.function
/ lambdas used within Mono operators
should be avoided, as these may be shared between several Subscribers
.
Flux
构造器和说明 |
---|
Mono() |
限定符和类型 | 方法和说明 |
---|---|
Mono<java.lang.Void> |
and(org.reactivestreams.Publisher<?> other)
将多个不需要关注结果的源进行合并,不会对异常进行回播。
|
<P> P |
as(java.util.function.Function<? super Mono<T>,P> transformer)
将此Mono转换为目标类型
Transform this
Mono into a target type. |
T |
block()
定阅此Mono并无限期阻塞,直到接收到下一个信号并返回该值
Subscribe to this
Mono and block indefinitely until a next signal is
received. |
T |
block(java.time.Duration timeout)
订阅此Mono并阻塞,直到收到下一个信号或超时到期。
|
java.util.Optional<T> |
blockOptional()
订阅此Mono并无限期阻塞,直到收到下一个信号或 Mono 完成为空。
|
java.util.Optional<T> |
blockOptional(java.time.Duration timeout)
订阅此Mono并阻塞,直到收到下一个信号、Mono 完成为空或超时到期。
|
Mono<T> |
cache()
将此Mono变成热源并且被订阅一次后就无限缓存。
|
Mono<T> |
cache(java.time.Duration ttl)
根据
Duration 指定 Mono 缓存失效时间
Turn this Mono into a hot source and cache last emitted signals for further
Subscriber , with an expiry timeout. |
Mono<T> |
cache(java.time.Duration ttl,
Scheduler timer)
根据 Duration 指定 Mono 缓存失效时间,使用指定调度器检测过期时间
Turn this
Mono into a hot source and cache last emitted signals for further
Subscriber , with an expiry timeout. |
Mono<T> |
cache(java.util.function.Function<? super T,java.time.Duration> ttlForValue,
java.util.function.Function<java.lang.Throwable,java.time.Duration> ttlForError,
java.util.function.Supplier<java.time.Duration> ttlForEmpty)
可指定当Mono成功发布,异常,返回Empty场景的缓存时间
Turn this
Mono into a hot source and cache last emitted signal for further
Subscriber , with an expiry timeout (TTL) that depends on said signal. |
Mono<T> |
cache(java.util.function.Function<? super T,java.time.Duration> ttlForValue,
java.util.function.Function<java.lang.Throwable,java.time.Duration> ttlForError,
java.util.function.Supplier<java.time.Duration> ttlForEmpty,
Scheduler timer)
可指定当Mono成功发布,异常,返回Empty场景的缓存时间,并指定检测超时的线程池
Turn this
Mono into a hot source and cache last emitted signal for further
Subscriber , with an expiry timeout (TTL) that depends on said signal. |
Mono<T> |
cacheInvalidateIf(java.util.function.Predicate<? super T> invalidationPredicate)
当存在多个订阅者时,可以使用此方法决定当前元素缓存是否失效,true则失效。
|
Mono<T> |
cacheInvalidateWhen(java.util.function.Function<? super T,Mono<java.lang.Void>> invalidationTriggerGenerator)
自定义控制缓存失效。
|
Mono<T> |
cacheInvalidateWhen(java.util.function.Function<? super T,Mono<java.lang.Void>> invalidationTriggerGenerator,
java.util.function.Consumer<? super T> onInvalidate)
自定义控制缓存失效且提供一个Consumer处理失效的值
Cache
onNext signal received from the source and replay it to other subscribers,
while allowing invalidation via a Mono<Void> companion trigger generated from the currently
cached value. |
Mono<T> |
cancelOn(Scheduler scheduler)
|
<E> Mono<E> |
cast(java.lang.Class<E> clazz)
将当前Mono产生的类型转换为目标类型,如果两种类型不能转换则抛出异常
Cast the current
Mono produced type into a target produced type. |
Mono<T> |
checkpoint()
激活调试模式 - 又名回溯
如果你知道问题出在哪个链上,但是由于这个链的上游或下游来自其他的调用,就可以针对这个链使用checkpoint()进行问题定位
Activate traceback (full assembly tracing) for this particular
Mono , in case of an error
upstream of the checkpoint. |
Mono<T> |
checkpoint(java.lang.String description)
可以添加描述的调试模式
Activate traceback (assembly marker) for this particular
Mono by giving it a description that
will be reflected in the assembly traceback in case of an error upstream of the
checkpoint. |
Mono<T> |
checkpoint(java.lang.String description,
boolean forceStackTrace)
Activate traceback (full assembly tracing or the lighter assembly marking depending on the
forceStackTrace option). |
Flux<T> |
concatWith(org.reactivestreams.Publisher<? extends T> other)
将多个源连接起来返回一个 Flux
Concatenate emissions of this
Mono with the provided Publisher
(no interleave). |
Mono<T> |
contextCapture()
用于支持 context-propagation 第三方库,实现ThreadLocal和上下文传播
If context-propagation library
is on the classpath, this is a convenience shortcut to capture thread local values during the
subscription phase and put them in the
Context that is visible upstream of this operator. |
Mono<T> |
contextWrite(ContextView contextToAppend)
将一个
ContextView 和下游的 ContextView 进行合并
Enrich the Context visible from downstream for the benefit of upstream
operators, by making all values from the provided ContextView visible on top
of pairs from downstream. |
Mono<T> |
contextWrite(java.util.function.Function<Context,Context> contextModifier)
|
static <T> Mono<T> |
create(java.util.function.Consumer<MonoSink<T>> callback)
允许同步或异步通过
MonoSink 创建一个Mono
Creates a deferred emitter that can be used with callback-based
APIs to signal at most one value, a complete or an error signal. |
Mono<T> |
defaultIfEmpty(T defaultV)
如果Mono是
empty() 则提供一个默认值
Provide a default single value if this mono is completed without any data
|
static <T> Mono<T> |
defer(java.util.function.Supplier<? extends Mono<? extends T>> supplier)
为每个订阅者延迟发布一个元素,在被订阅时在发布者端延迟发布(懒汉式创建,而
just(Object) 则是饿汉式创建)
Create a Mono provider that will supply a target Mono to subscribe to for
each Subscriber downstream. |
static <T> Mono<T> |
deferContextual(java.util.function.Function<ContextView,? extends Mono<? extends T>> contextualMonoFactory)
获取下游设置的上下文内容,如果存在多层Mono嵌套设置上下文
contextWrite(ContextView) 则取距离最近的上下文
Create a Mono provider that will supply a target Mono
to subscribe to for each Subscriber downstream. |
static Mono<java.lang.Long> |
delay(java.time.Duration duration)
在默认Scheduler上创建一个Mono,它将onNext信号延迟给定的时间。
|
static Mono<java.lang.Long> |
delay(java.time.Duration duration,
Scheduler timer)
在指定Scheduler上创建一个Mono,它将onNext信号延迟给定的时间。
|
Mono<T> |
delayElement(java.time.Duration delay)
将此Mono元素(Subscriber.onNext信号)延迟给定的持续时间(并行调度)。
|
Mono<T> |
delayElement(java.time.Duration delay,
Scheduler timer)
|
Mono<T> |
delaySubscription(java.time.Duration delay)
延迟订阅,直到延期订阅时间到期
Delay the
subscription to this Mono source until the given
period elapses. |
Mono<T> |
delaySubscription(java.time.Duration delay,
Scheduler timer)
|
<U> Mono<T> |
delaySubscription(org.reactivestreams.Publisher<U> subscriptionDelay)
Delay the subscription to this
Mono until another Publisher
signals a value or completes. |
Mono<T> |
delayUntil(java.util.function.Function<? super T,? extends org.reactivestreams.Publisher<?>> triggerProvider)
中继触发器,将当前发布者的发布延迟到下一个发布者的发布
Subscribe to this
Mono and another Publisher that is generated from
this Mono's element and which will be used as a trigger for relaying said element. |
<X> Mono<X> |
dematerialize()
当这个Mono使用
Signal 信号量的 onNext、onError、onComplete时,可以使用这个方法让流消失,让订阅者订阅信号量
An operator working only if this Mono emits onNext, onError or onComplete Signal
instances, transforming these materialized signals into
real signals on the Subscriber . |
Mono<T> |
doAfterTerminate(java.lang.Runnable afterTerminate)
在Mono终止后触发的行为
Add behavior (side-effect) triggered after the
Mono terminates, either by
completing downstream successfully or with an error. |
Mono<T> |
doFinally(java.util.function.Consumer<SignalType> onFinally)
Mono因任何原因终止后添加行为触发(包括取消、终止、完成),会传递一个信号量类型通知你
SignalType
Add behavior triggering after the Mono terminates for any reason,
including cancellation. |
Mono<T> |
doFirst(java.lang.Runnable onFirst)
添加在订阅Mono之前触发的行为(未开始请求数据),这应该是装配时间之后的第一个事件。
|
Mono<T> |
doOnCancel(java.lang.Runnable onCancel)
添加取消时的触发行为,首先执行
Runnable ,然后将取消信号向上游传播到源
Add behavior triggered when the Mono is cancelled. |
<R> Mono<T> |
doOnDiscard(java.lang.Class<R> type,
java.util.function.Consumer<? super R> discardHook)
在非常特殊的情况下,你的应用程序可能会处理那些一旦不再使用就需要某种形式清理的类型。
|
Mono<T> |
doOnEach(java.util.function.Consumer<? super Signal<T>> signalConsumer)
添加当Mono发布、错误失败或成功完成时触发的行为。
|
<E extends java.lang.Throwable> |
doOnError(java.lang.Class<E> exceptionType,
java.util.function.Consumer<? super E> onError)
添加当Mono因异常终止并且匹配指定异常类型触发的行为。
|
Mono<T> |
doOnError(java.util.function.Consumer<? super java.lang.Throwable> onError)
添加Mono因异常而终止时触发的行为,Consumer首先被执行,然后 onError 信号被传播到下游
Add behavior triggered when the
Mono completes with an error. |
Mono<T> |
doOnError(java.util.function.Predicate<? super java.lang.Throwable> predicate,
java.util.function.Consumer<? super java.lang.Throwable> onError)
添加当Mono完成并出现与给定
Predicate (泛型 Throwable) 匹配的错误时触发的行为。 |
Mono<T> |
doOnNext(java.util.function.Consumer<? super T> onNext)
添加当Mono成功发出数据时触发的行为,
Consumer 首先执行,然后将 onNext 信号传播到下游
Add behavior triggered when the Mono emits a data successfully. |
Mono<T> |
doOnRequest(java.util.function.LongConsumer consumer)
当Mono收到任何请求时添加触发LongConsumer行为 (Long为背压数)。
|
Mono<T> |
doOnSubscribe(java.util.function.Consumer<? super org.reactivestreams.Subscription> onSubscribe)
当发布者被订阅时触发,先传递
Subscription 订阅者给 Consumer 处理后,再将信号传播到下游
Add behavior (side-effect) triggered when the Mono is being subscribed,
that is to say when a Subscription has been produced by the Publisher
and is being passed to the Subscriber.onSubscribe(Subscription) . |
Mono<T> |
doOnSuccess(java.util.function.Consumer<? super T> onSuccess)
一旦可以认为Mono已成功完成,就会触发添加行为。
|
Mono<T> |
doOnTerminate(java.lang.Runnable onTerminate)
添加当Mono终止时触发的行为,可以是在正常完成,异常,返回空Mono时触发
Add behavior triggered when the
Mono terminates, either by completing with a value,
completing empty or failing with an error. |
Mono<Tuple2<java.lang.Long,T>> |
elapsed()
|
Mono<Tuple2<java.lang.Long,T>> |
elapsed(Scheduler scheduler)
返回一个 Tuple2, 第一个值是生成数据的耗时,并指定线程池去记录耗时
Map this
Mono sequence into Tuple2<Long, T>
of timemillis and source data. |
static <T> Mono<T> |
empty()
创建一个不发布任何元素的空Mono
Create a
Mono that completes without emitting any item. |
static <T> Mono<T> |
error(java.util.function.Supplier<? extends java.lang.Throwable> errorSupplier)
创建一个在订阅后立即以错误终止的Mono 。
|
static <T> Mono<T> |
error(java.lang.Throwable error)
创建一个Mono,它在订阅后立即以指定的错误终止
Create a
Mono that terminates with the specified error immediately after
being subscribed to. |
Flux<T> |
expand(java.util.function.Function<? super T,? extends org.reactivestreams.Publisher<? extends T>> expander)
递归地将元素展开到一个图中,并使用 宽度优先算法 遍历策略发出所有结果元素
Recursively expand elements into a graph and emit all the resulting element using
a breadth-first traversal strategy.
|
Flux<T> |
expand(java.util.function.Function<? super T,? extends org.reactivestreams.Publisher<? extends T>> expander,
int capacityHint)
Recursively expand elements into a graph and emit all the resulting element using
a breadth-first traversal strategy.
|
Flux<T> |
expandDeep(java.util.function.Function<? super T,? extends org.reactivestreams.Publisher<? extends T>> expander)
Recursively expand elements into a graph and emit all the resulting element,
in a depth-first traversal order.
|
Flux<T> |
expandDeep(java.util.function.Function<? super T,? extends org.reactivestreams.Publisher<? extends T>> expander,
int capacityHint)
Recursively expand elements into a graph and emit all the resulting element,
in a depth-first traversal order.
|
Mono<T> |
filter(java.util.function.Predicate<? super T> tester)
过滤,只回放结果为 true 的元素
If this
Mono is valued, test the result and replay it if predicate returns true. |
Mono<T> |
filterWhen(java.util.function.Function<? super T,? extends org.reactivestreams.Publisher<java.lang.Boolean>> asyncPredicate)
和filter一样都是过滤,不过返回的是
Publisher 的布尔类型
If this Mono is valued, test the value asynchronously using a generated
Publisher<Boolean> test. |
static <T> Mono<T> |
first(java.lang.Iterable<? extends Mono<? extends T>> monos)
已过时。
use
firstWithSignal(Iterable) . To be removed in reactor 3.5. |
static <T> Mono<T> |
first(Mono<? extends T>... monos)
已过时。
use
firstWithSignal(Mono[]) . To be removed in reactor 3.5. |
static <T> Mono<T> |
firstWithSignal(java.lang.Iterable<? extends Mono<? extends T>> monos)
给定一个Mono列表,如果第一个异常则处理第二个,返回第一个执行完成的Mono
Pick the first
Mono to emit any signal (value, empty completion or error)
and replay that signal, effectively behaving like the fastest of these competing
sources. |
static <T> Mono<T> |
firstWithSignal(Mono<? extends T>... monos)
给定一个不定长的Mono列表,如果第一个异常则处理第二个,返回第一个执行完成的Mono
Pick the first
Mono to emit any signal (value, empty completion or error)
and replay that signal, effectively behaving like the fastest of these competing
sources. |
static <T> Mono<T> |
firstWithValue(java.lang.Iterable<? extends Mono<? extends T>> monos)
用迭代器给定一批Mono,处理第一个有元素的Mono。
|
static <T> Mono<T> |
firstWithValue(Mono<? extends T> first,
Mono<? extends T>... others)
给定一批Mono,处理第一个有元素的Mono。
|
<R> Mono<R> |
flatMap(java.util.function.Function<? super T,? extends Mono<? extends R>> transformer)
|
<R> Flux<R> |
flatMapIterable(java.util.function.Function<? super T,? extends java.lang.Iterable<? extends R>> mapper)
|
<R> Flux<R> |
flatMapMany(java.util.function.Function<? super T,? extends org.reactivestreams.Publisher<? extends R>> mapper)
|
<R> Flux<R> |
flatMapMany(java.util.function.Function<? super T,? extends org.reactivestreams.Publisher<? extends R>> mapperOnNext,
java.util.function.Function<? super java.lang.Throwable,? extends org.reactivestreams.Publisher<? extends R>> mapperOnError,
java.util.function.Supplier<? extends org.reactivestreams.Publisher<? extends R>> mapperOnComplete)
|
Flux<T> |
flux()
|
static <T> Mono<T> |
from(org.reactivestreams.Publisher<? extends T> source)
|
static <T> Mono<T> |
fromCallable(java.util.concurrent.Callable<? extends T> supplier)
|
static <T> Mono<T> |
fromCompletionStage(java.util.concurrent.CompletionStage<? extends T> completionStage)
|
static <T> Mono<T> |
fromCompletionStage(java.util.function.Supplier<? extends java.util.concurrent.CompletionStage<? extends T>> stageSupplier)
懒加载地使用
CompletionStage 创建一个Mono
Create a Mono that wraps a lazily-supplied CompletionStage on subscription,
emitting the value produced by the CompletionStage . |
static <I> Mono<I> |
fromDirect(org.reactivestreams.Publisher<? extends I> source)
将Publisher转换为Mono而不进行任何基数检查(即此方法不会取消第一个元素之后的源)。
|
static <T> Mono<T> |
fromFuture(java.util.concurrent.CompletableFuture<? extends T> future)
使用
CompletableFuture 创建一个Mono
Create a Mono , producing its value using the provided CompletableFuture
and cancelling the future if the Mono gets cancelled. |
static <T> Mono<T> |
fromFuture(java.util.concurrent.CompletableFuture<? extends T> future,
boolean suppressCancel)
使用
CompletableFuture 创建一个Mono, 当Mono被取消订阅时可以选自是否取消 CompletableFuture
Create a Mono , producing its value using the provided CompletableFuture
and optionally cancelling the future if the Mono gets cancelled (if suppressCancel == false ). |
static <T> Mono<T> |
fromFuture(java.util.function.Supplier<? extends java.util.concurrent.CompletableFuture<? extends T>> futureSupplier)
使用
CompletableFuture 懒加载创建一个Mono
Create a Mono that wraps a lazily-supplied CompletableFuture on subscription,
emitting the value produced by the future and cancelling the future if the Mono gets cancelled. |
static <T> Mono<T> |
fromFuture(java.util.function.Supplier<? extends java.util.concurrent.CompletableFuture<? extends T>> futureSupplier,
boolean suppressCancel)
使用 CompletableFuture 懒加载创建一个Mono, 当Mono被取消订阅时可以选自是否取消 CompletableFuture
Create a
Mono that wraps a lazily-supplied CompletableFuture on subscription,
emitting the value produced by the future and optionally cancelling the future if the Mono gets cancelled
(if suppressCancel == false ). |
static <T> Mono<T> |
fromRunnable(java.lang.Runnable runnable)
|
static <T> Mono<T> |
fromSupplier(java.util.function.Supplier<? extends T> supplier)
|
<R> Mono<R> |
handle(java.util.function.BiConsumer<? super T,SynchronousSink<R>> handler)
通过为每个 onNext 调用带有输出接收器的
BiConsumer 来处理此Mono发出的项目,相比与 map(Function) ,
它可以往Sink里传递 SynchronousSink.error(Throwable) 或 SynchronousSink.complete()
Handle the items emitted by this Mono by calling a biconsumer with the
output sink for each onNext. |
Mono<java.lang.Boolean> |
hasElement()
判断是否存在发布的元素
Emit a single boolean true if this
Mono has an element. |
Mono<T> |
hide()
包装另一个 Mono 并隐藏其身份,包括其订阅者
Hides the identity of this
Mono instance. |
Mono<T> |
ignoreElement()
忽略 onNext 信号(丢弃它)并且只传播终止事件
Ignores onNext signal (dropping it) and only propagates termination events.
|
static <T> Mono<T> |
ignoreElements(org.reactivestreams.Publisher<T> source)
创建一个完全忽略元素的Mono
Create a new
Mono that ignores elements from the source (dropping them),
but completes when the source completes. |
static <T> Mono<T> |
just(T data)
最简单创建一个Mono
Create a new
Mono that emits the specified item, which is captured at
instantiation time. |
static <T> Mono<T> |
justOrEmpty(java.util.Optional<? extends T> data)
|
static <T> Mono<T> |
justOrEmpty(T data)
|
Mono<T> |
log()
观察所有 Reactive Streams 信号并使用Logger支持跟踪它们
Observe all Reactive Streams signals and trace them using
Logger support. |
Mono<T> |
log(Logger logger)
Observe Reactive Streams signals matching the passed filter
options and
trace them using a specific user-provided Logger , at Level.INFO level. |
Mono<T> |
log(Logger logger,
java.util.logging.Level level,
boolean showOperatorLine,
SignalType... options)
Observe Reactive Streams signals matching the passed filter
options and
trace them using a specific user-provided Logger , at the given Level . |
Mono<T> |
log(java.lang.String category)
Observe all Reactive Streams signals and use
Logger support to handle trace implementation. |
Mono<T> |
log(java.lang.String category,
java.util.logging.Level level,
boolean showOperatorLine,
SignalType... options)
Observe Reactive Streams signals matching the passed filter
options and
use Logger support to
handle trace
implementation. |
Mono<T> |
log(java.lang.String category,
java.util.logging.Level level,
SignalType... options)
Observe Reactive Streams signals matching the passed flags
options and use
Logger support to handle trace implementation. |
<R> Mono<R> |
map(java.util.function.Function<? super T,? extends R> mapper)
同步方式转换元素
Transform the item emitted by this
Mono by applying a synchronous function to it. |
<R> Mono<R> |
mapNotNull(java.util.function.Function<? super T,? extends R> mapper)
转换流中的元素,并忽略null值
Transform the item emitted by this
Mono by applying a synchronous function to it, which is allowed
to produce a null value. |
Mono<Signal<T>> |
materialize()
|
Flux<T> |
mergeWith(org.reactivestreams.Publisher<? extends T> other)
Merge emissions of this
Mono with the provided Publisher . |
Mono<T> |
metrics()
已过时。
Prefer using the
tap(SignalListenerFactory) with the SignalListenerFactory provided by
the new reactor-core-micrometer module. To be removed in 3.6.0 at the earliest. |
Mono<T> |
name(java.lang.String name)
为这个序列起一个前缀名字
Give a name to this sequence, which can be retrieved using
Scannable.name()
as long as this is the first reachable Scannable.parents() . |
static <T> Mono<T> |
never()
|
<U> Mono<U> |
ofType(java.lang.Class<U> clazz)
判断当前Mono的类型能否转换成指定类型,不能转换则直接忽略
Evaluate the emitted value against the given
Class type. |
Mono<T> |
onErrorComplete()
吃掉Error信号,只需将 onError 信号替换为 onComplete 信号即可完成序列。
|
Mono<T> |
onErrorComplete(java.lang.Class<? extends java.lang.Throwable> type)
如果错误与给定的Class匹配,只需将onError signal替换为onComplete signal即可完成序列。
|
Mono<T> |
onErrorComplete(java.util.function.Predicate<? super java.lang.Throwable> predicate)
使用一个
Predicate 返回的布尔值来决定是否吞掉异常让源完成
Simply complete the sequence by replacing an onError signal
with an onComplete signal if the error matches the given
Predicate . |
Mono<T> |
onErrorContinue(java.util.function.BiConsumer<java.lang.Throwable,java.lang.Object> errorConsumer)
Let compatible operators upstream recover from errors by dropping the
incriminating element from the sequence and continuing with subsequent elements.
|
<E extends java.lang.Throwable> |
onErrorContinue(java.lang.Class<E> type,
java.util.function.BiConsumer<java.lang.Throwable,java.lang.Object> errorConsumer)
Let compatible operators upstream recover from errors by dropping the
incriminating element from the sequence and continuing with subsequent elements.
|
<E extends java.lang.Throwable> |
onErrorContinue(java.util.function.Predicate<E> errorPredicate,
java.util.function.BiConsumer<java.lang.Throwable,java.lang.Object> errorConsumer)
Let compatible operators upstream recover from errors by dropping the
incriminating element from the sequence and continuing with subsequent elements.
|
<E extends java.lang.Throwable> |
onErrorMap(java.lang.Class<E> type,
java.util.function.Function<? super E,? extends java.lang.Throwable> mapper)
如果错误与给定类型匹配,则通过同步应用函数来转换此Mono发出的错误。
|
Mono<T> |
onErrorMap(java.util.function.Function<? super java.lang.Throwable,? extends java.lang.Throwable> mapper)
通过同步方法,将一个类型的异常转换为另一种类型的异常
Transform any error emitted by this
Mono by synchronously applying a function to it. |
Mono<T> |
onErrorMap(java.util.function.Predicate<? super java.lang.Throwable> predicate,
java.util.function.Function<? super java.lang.Throwable,? extends java.lang.Throwable> mapper)
如果错误与给定
Predicate 匹配,则通过同步应用函数来转换此Mono发出的错误。 |
<E extends java.lang.Throwable> |
onErrorResume(java.lang.Class<E> type,
java.util.function.Function<? super E,? extends Mono<? extends T>> fallback)
当发生与给定类型匹配的错误时进行错误恢复(动态候补值),当上游发生异常进行回调
Subscribe to a fallback publisher when an error matching the given type
occurs, using a function to choose the fallback depending on the error.
|
Mono<T> |
onErrorResume(java.util.function.Function<? super java.lang.Throwable,? extends Mono<? extends T>> fallback)
错误恢复(动态候补值),当上游发生异常进行回调
Subscribe to a fallback publisher when any error occurs, using a function to
choose the fallback depending on the error.
|
Mono<T> |
onErrorResume(java.util.function.Predicate<? super java.lang.Throwable> predicate,
java.util.function.Function<? super java.lang.Throwable,? extends Mono<? extends T>> fallback)
当与给定的
Predicate 匹配时,进行错误恢复(动态候补值),当上游发生异常进行回调
Subscribe to a fallback publisher when an error matching a given predicate
occurs. |
<E extends java.lang.Throwable> |
onErrorReturn(java.lang.Class<E> type,
T fallbackValue)
当Mono的异常和给定的异常匹配,返回一个默认的值
Simply emit a captured fallback value when an error of the specified type is
observed on this
Mono . |
Mono<T> |
onErrorReturn(java.util.function.Predicate<? super java.lang.Throwable> predicate,
T fallbackValue)
如果Mono发生的异常与
Predicate 匹配,则返回一个默认的值
Simply emit a captured fallback value when an error matching the given predicate is
observed on this Mono . |
Mono<T> |
onErrorReturn(T fallbackValue)
当前Mono发生异常时,返回一个默认的值
Simply emit a captured fallback value when any error is observed on this
Mono . |
Mono<T> |
onErrorStop()
If an
onErrorContinue(BiConsumer) variant has been used downstream, reverts
to the default 'STOP' mode where errors are terminal events upstream. |
Mono<T> |
onTerminateDetach()
在终止或取消时分离
Subscriber (事件处理者,是真正去响应事件和处理数据的角色) 和
Subscription (订阅者,是实际从Publisher中获取数据的角色)
Detaches both the child Subscriber and the Subscription on
termination or cancellation. |
Mono<T> |
or(Mono<? extends T> other)
从当前Mono或给定的另一个Mono发布一个可用的元素
Emit the first available signal from this mono or the other mono.
|
<R> Mono<R> |
publish(java.util.function.Function<? super Mono<T>,? extends Mono<? extends R>> transform)
再次发布
Share a
Mono for the duration of a function that may transform it and
consume it as many times as necessary without causing multiple subscriptions
to the upstream. |
Mono<T> |
publishOn(Scheduler scheduler)
|
Flux<T> |
repeat()
在上一次订阅完成后重复无限期订阅源
Repeatedly and indefinitely subscribe to the source upon completion of the
previous subscription.
|
Flux<T> |
repeat(java.util.function.BooleanSupplier predicate)
如果
BooleanSupplier 在上一次订阅完成后返回 true,则重复订阅源
Repeatedly subscribe to the source if the predicate returns true after completion of the previous subscription. |
Flux<T> |
repeat(long numRepeat)
重复订阅源 numRepeat 次。
|
Flux<T> |
repeat(long numRepeat,
java.util.function.BooleanSupplier predicate)
重复订阅,如果 predicate 在上一次订阅完成后返回 true,则重复订阅源,可指定重复次数
Repeatedly subscribe to the source if the predicate returns true after completion of the previous
subscription.
|
Flux<T> |
repeatWhen(java.util.function.Function<Flux<java.lang.Long>,? extends org.reactivestreams.Publisher<?>> repeatFactory)
Repeatedly subscribe to this
Mono when a companion sequence emits elements in
response to the flux completion signal. |
Mono<T> |
repeatWhenEmpty(java.util.function.Function<Flux<java.lang.Long>,? extends org.reactivestreams.Publisher<?>> repeatFactory)
|
Mono<T> |
repeatWhenEmpty(int maxRepeat,
java.util.function.Function<Flux<java.lang.Long>,? extends org.reactivestreams.Publisher<?>> repeatFactory)
|
Mono<T> |
retry()
异常无限重试
Re-subscribes to this
Mono sequence if it signals any error, indefinitely. |
Mono<T> |
retry(long numRetries)
异常重试指定次数
Re-subscribes to this
Mono sequence if it signals any error, for a fixed
number of times. |
Mono<T> |
retryWhen(Retry retrySpec)
|
static <T> Mono<java.lang.Boolean> |
sequenceEqual(org.reactivestreams.Publisher<? extends T> source1,
org.reactivestreams.Publisher<? extends T> source2)
两个序列对比
Returns a Mono that emits a Boolean value that indicates whether two Publisher sequences are the
same by comparing the items emitted by each Publisher pairwise.
|
static <T> Mono<java.lang.Boolean> |
sequenceEqual(org.reactivestreams.Publisher<? extends T> source1,
org.reactivestreams.Publisher<? extends T> source2,
java.util.function.BiPredicate<? super T,? super T> isEqual)
两个序列对比,并需要提供一个元素对比函数
Returns a Mono that emits a Boolean value that indicates whether two Publisher sequences are the
same by comparing the items emitted by each Publisher pairwise based on the results of a specified
equality function.
|
static <T> Mono<java.lang.Boolean> |
sequenceEqual(org.reactivestreams.Publisher<? extends T> source1,
org.reactivestreams.Publisher<? extends T> source2,
java.util.function.BiPredicate<? super T,? super T> isEqual,
int prefetch)
两个序列对比,并需要提供一个元素对比函数。
|
Mono<T> |
share()
共享此Mono,其他Subscriber将共享相同的Subscription并因此获得相同的结果。
|
Mono<T> |
single()
期望当前序列恰巧只有一直元素,为空或超过1个元素抛出异常
Expect exactly one item from this
Mono source or signal
NoSuchElementException for an empty source. |
Disposable |
subscribe()
Subscribe to this
Mono and request unbounded demand. |
Disposable |
subscribe(java.util.function.Consumer<? super T> consumer)
Subscribe a
Consumer to this Mono that will consume all the
sequence. |
Disposable |
subscribe(java.util.function.Consumer<? super T> consumer,
java.util.function.Consumer<? super java.lang.Throwable> errorConsumer)
Subscribe to this
Mono with a Consumer that will consume all the
elements in the sequence, as well as a Consumer that will handle errors. |
Disposable |
subscribe(java.util.function.Consumer<? super T> consumer,
java.util.function.Consumer<? super java.lang.Throwable> errorConsumer,
java.lang.Runnable completeConsumer)
Subscribe
Consumer to this Mono that will respectively consume all the
elements in the sequence, handle errors and react to completion. |
Disposable |
subscribe(java.util.function.Consumer<? super T> consumer,
java.util.function.Consumer<? super java.lang.Throwable> errorConsumer,
java.lang.Runnable completeConsumer,
java.util.function.Consumer<? super org.reactivestreams.Subscription> subscriptionConsumer)
Subscribe
Consumer to this Mono that will respectively consume all the
elements in the sequence, handle errors, react to completion, and request upon subscription. |
Disposable |
subscribe(java.util.function.Consumer<? super T> consumer,
java.util.function.Consumer<? super java.lang.Throwable> errorConsumer,
java.lang.Runnable completeConsumer,
Context initialContext)
Subscribe
Consumer to this Mono that will respectively consume all the
elements in the sequence, handle errors and react to completion. |
abstract void |
subscribe(CoreSubscriber<? super T> actual)
An internal
Publisher.subscribe(Subscriber) that will bypass
Hooks.onLastOperator(Function) pointcut. |
void |
subscribe(org.reactivestreams.Subscriber<? super T> actual) |
Mono<T> |
subscribeOn(Scheduler scheduler)
设置订阅者线程池
Run subscribe, onSubscribe and request on a specified
Scheduler 's Scheduler.Worker . |
<E extends org.reactivestreams.Subscriber<? super T>> |
subscribeWith(E subscriber)
Subscribe the given
Subscriber to this Mono and return said
Subscriber , allowing subclasses with a richer API to be used fluently. |
Mono<T> |
switchIfEmpty(Mono<? extends T> alternate)
如果当前序列为空则切换到新的序列
Fallback to an alternative
Mono if this mono is completed without data
|
Mono<T> |
tag(java.lang.String key,
java.lang.String value)
Tag this mono with a key/value pair.
|
Mono<T> |
take(java.time.Duration duration)
像超时时间,在指定时间没有发布元素则自动完成(返回空Mono)
Give this Mono a chance to resolve within a specified time frame but complete if it
doesn't.
|
Mono<T> |
take(java.time.Duration duration,
Scheduler timer)
Give this Mono a chance to resolve within a specified time frame but complete if it
doesn't.
|
Mono<T> |
takeUntilOther(org.reactivestreams.Publisher<?> other)
Give this Mono a chance to resolve before a companion
Publisher emits. |
Mono<T> |
tap(java.util.function.Function<ContextView,SignalListener<T>> listenerGenerator)
Tap into Reactive Streams signals emitted or received by this
Mono and notify a stateful per-Subscriber
SignalListener . |
Mono<T> |
tap(SignalListenerFactory<T,?> listenerFactory)
Tap into Reactive Streams signals emitted or received by this
Mono and notify a stateful per-Subscriber
SignalListener created by the provided SignalListenerFactory . |
Mono<T> |
tap(java.util.function.Supplier<SignalListener<T>> simpleListenerGenerator)
Tap into Reactive Streams signals emitted or received by this
Mono and notify a stateful per-Subscriber
SignalListener . |
Mono<java.lang.Void> |
then()
返回一个
Mono<Void> ,它只重播来自这个 Mono 的完成和异常信号,一般用于执行不关注返回值的下一步任务
Return a Mono<Void> which only replays complete and error signals
from this Mono . |
<V> Mono<V> |
then(Mono<V> other)
让这个Mono完成然后播放另一个 Mono,异常会重播。
|
Mono<java.lang.Void> |
thenEmpty(org.reactivestreams.Publisher<java.lang.Void> other)
只处理业务不关注返回值
Return a
Mono<Void> that waits for this Mono to complete then
for a supplied Publisher<Void> to also complete. |
<V> Flux<V> |
thenMany(org.reactivestreams.Publisher<V> other)
|
<V> Mono<V> |
thenReturn(V value)
让这个Mono成功完成,然后发出提供的值
Let this
Mono complete successfully, then emit the provided value. |
Mono<Timed<T>> |
timed()
为这个
Mono Subscriber.onNext(Object) 添加计时事件,封装到一个 Timed 对象中
Times this Mono Subscriber.onNext(Object) event, encapsulated into a Timed object
that lets downstream consumer look at various time information gathered with nanosecond
resolution using the default clock (Schedulers.parallel() ):
Timed.elapsed() : the time in nanoseconds since subscription, as a Duration . |
Mono<Timed<T>> |
timed(Scheduler clock)
Times this
Mono Subscriber.onNext(Object) event, encapsulated into a Timed object
that lets downstream consumer look at various time information gathered with nanosecond
resolution using the provided Scheduler as a clock:
Timed.elapsed() : the time in nanoseconds since subscription, as a Duration . |
Mono<T> |
timeout(java.time.Duration timeout)
超时设置。
|
Mono<T> |
timeout(java.time.Duration timeout,
Mono<? extends T> fallback)
超时设置。
|
Mono<T> |
timeout(java.time.Duration timeout,
Mono<? extends T> fallback,
Scheduler timer)
超时设置。
|
Mono<T> |
timeout(java.time.Duration timeout,
Scheduler timer)
设置超时时间和指定检测超时的线程池
Signal a
TimeoutException error in case an item doesn't arrive before the given period,
as measured on the provided Scheduler . |
<U> Mono<T> |
timeout(org.reactivestreams.Publisher<U> firstTimeout)
发布延迟超时设置,超时传播TimeoutException
Signal a
TimeoutException in case the item from this Mono has
not been emitted before the given Publisher emits. |
<U> Mono<T> |
timeout(org.reactivestreams.Publisher<U> firstTimeout,
Mono<? extends T> fallback)
如果在给定Publisher发出之前尚未发出来自此Mono的元素,请切换到后备Publisher
Switch to a fallback
Publisher in case the item from this Mono has
not been emitted before the given Publisher emits. |
Mono<Tuple2<java.lang.Long,T>> |
timestamp()
|
Mono<Tuple2<java.lang.Long,T>> |
timestamp(Scheduler scheduler)
|
java.util.concurrent.CompletableFuture<T> |
toFuture()
将 Mono 转为一个
CompletableFuture
Transform this Mono into a CompletableFuture completing on onNext or onComplete and failing on
onError. |
java.lang.String |
toString() |
<V> Mono<V> |
transform(java.util.function.Function<? super Mono<T>,? extends org.reactivestreams.Publisher<V>> transformer)
|
<V> Mono<V> |
transformDeferred(java.util.function.Function<? super Mono<T>,? extends org.reactivestreams.Publisher<V>> transformer)
|
<V> Mono<V> |
transformDeferredContextual(java.util.function.BiFunction<? super Mono<T>,? super ContextView,? extends org.reactivestreams.Publisher<V>> transformer)
|
static <T,D> Mono<T> |
using(java.util.concurrent.Callable<? extends D> resourceSupplier,
java.util.function.Function<? super D,? extends Mono<? extends T>> sourceSupplier,
java.util.function.Consumer<? super D> resourceCleanup)
主要将一个资源使用工厂方法方式为每个订阅者生成资源,第一个参数在订阅时创建资源,第二个参数 一个Mono工厂创建 Mono,
第三个参数 资源清理方法
Uses a resource, generated by a supplier for each individual Subscriber, while streaming the value from a
Mono derived from the same resource and makes sure the resource is released if the
sequence terminates or the Subscriber cancels.
|
static <T,D> Mono<T> |
using(java.util.concurrent.Callable<? extends D> resourceSupplier,
java.util.function.Function<? super D,? extends Mono<? extends T>> sourceSupplier,
java.util.function.Consumer<? super D> resourceCleanup,
boolean eager)
Uses a resource, generated by a supplier for each individual Subscriber, while streaming the value from a
Mono derived from the same resource and makes sure the resource is released if the
sequence terminates or the Subscriber cancels.
|
static <T,D> Mono<T> |
usingWhen(org.reactivestreams.Publisher<D> resourceSupplier,
java.util.function.Function<? super D,? extends Mono<? extends T>> resourceClosure,
java.util.function.Function<? super D,? extends org.reactivestreams.Publisher<?>> asyncCleanup)
|
static <T,D> Mono<T> |
usingWhen(org.reactivestreams.Publisher<D> resourceSupplier,
java.util.function.Function<? super D,? extends Mono<? extends T>> resourceClosure,
java.util.function.Function<? super D,? extends org.reactivestreams.Publisher<?>> asyncComplete,
java.util.function.BiFunction<? super D,? super java.lang.Throwable,? extends org.reactivestreams.Publisher<?>> asyncError,
java.util.function.Function<? super D,? extends org.reactivestreams.Publisher<?>> asyncCancel)
工厂方法模式创建一个Mono
Uses a resource, generated by a
Publisher for each individual Subscriber ,
to derive a Mono .Note that all steps of the operator chain that would need the
resource to be in an open stable state need to be described inside the resourceClosure
Function . |
static Mono<java.lang.Void> |
when(java.lang.Iterable<? extends org.reactivestreams.Publisher<?>> sources)
迭代器方式聚合多个源,如果出现异常则发送发到返回的Mono中
Aggregate given publishers into a new Mono that will be
fulfilled when all of the given Publishers have completed.
|
static Mono<java.lang.Void> |
when(org.reactivestreams.Publisher<?>... sources)
聚合多个源,如果出现异常则发送发到返回的Mono中
Aggregate given publishers into a new Mono that will be fulfilled
when all of the given sources have completed.
|
static Mono<java.lang.Void> |
whenDelayError(java.lang.Iterable<? extends org.reactivestreams.Publisher<?>> sources)
将给定的发布者聚合到一个新的 Mono 中,当所有给定的源都完成时,它将被实现。
|
static Mono<java.lang.Void> |
whenDelayError(org.reactivestreams.Publisher<?>... sources)
将给定的发布者聚合到一个新的 Mono 中,当所有给定的源都完成时,它将被实现。
|
static <R> Mono<R> |
zip(java.util.function.Function<? super java.lang.Object[],? extends R> combinator,
Mono<?>... monos)
Aggregate given monos into a new Mono that will be fulfilled when all of the given Monos have produced an item, aggregating their values according to the provided combinator function.
|
static <R> Mono<R> |
zip(java.lang.Iterable<? extends Mono<?>> monos,
java.util.function.Function<? super java.lang.Object[],? extends R> combinator)
Aggregate given monos into a new Mono that will be fulfilled when all of the given Monos have produced an item, aggregating their values according to the provided combinator function.
|
static <T1,T2> Mono<Tuple2<T1,T2>> |
zip(Mono<? extends T1> p1,
Mono<? extends T2> p2)
多个源压缩到一起,等待所有源发出一个元素之后,完成这些源并对返回元素使用元组进行组合。
|
static <T1,T2,O> Mono<O> |
zip(Mono<? extends T1> p1,
Mono<? extends T2> p2,
java.util.function.BiFunction<? super T1,? super T2,? extends O> combinator)
多个源压缩到一起,等待所有源发出一个元素之后,完成这些源并对返回元素使用元组进行组合。
|
static <T1,T2,T3> Mono<Tuple3<T1,T2,T3>> |
zip(Mono<? extends T1> p1,
Mono<? extends T2> p2,
Mono<? extends T3> p3)
Merge given monos into a new Mono that will be fulfilled when all of the given Monos
have produced an item, aggregating their values into a
Tuple3 . |
static <T1,T2,T3,T4> |
zip(Mono<? extends T1> p1,
Mono<? extends T2> p2,
Mono<? extends T3> p3,
Mono<? extends T4> p4)
Merge given monos into a new Mono that will be fulfilled when all of the given Monos
have produced an item, aggregating their values into a
Tuple4 . |
static <T1,T2,T3,T4,T5> |
zip(Mono<? extends T1> p1,
Mono<? extends T2> p2,
Mono<? extends T3> p3,
Mono<? extends T4> p4,
Mono<? extends T5> p5)
Merge given monos into a new Mono that will be fulfilled when all of the given Monos
have produced an item, aggregating their values into a
Tuple5 . |
static <T1,T2,T3,T4,T5,T6> |
zip(Mono<? extends T1> p1,
Mono<? extends T2> p2,
Mono<? extends T3> p3,
Mono<? extends T4> p4,
Mono<? extends T5> p5,
Mono<? extends T6> p6)
Merge given monos into a new Mono that will be fulfilled when all of the given Monos
have produced an item, aggregating their values into a
Tuple6 . |
static <T1,T2,T3,T4,T5,T6,T7> |
zip(Mono<? extends T1> p1,
Mono<? extends T2> p2,
Mono<? extends T3> p3,
Mono<? extends T4> p4,
Mono<? extends T5> p5,
Mono<? extends T6> p6,
Mono<? extends T7> p7)
Merge given monos into a new Mono that will be fulfilled when all of the given Monos
have produced an item, aggregating their values into a
Tuple7 . |
static <T1,T2,T3,T4,T5,T6,T7,T8> |
zip(Mono<? extends T1> p1,
Mono<? extends T2> p2,
Mono<? extends T3> p3,
Mono<? extends T4> p4,
Mono<? extends T5> p5,
Mono<? extends T6> p6,
Mono<? extends T7> p7,
Mono<? extends T8> p8)
Merge given monos into a new Mono that will be fulfilled when all of the given Monos
have produced an item, aggregating their values into a
Tuple8 . |
static <R> Mono<R> |
zipDelayError(java.util.function.Function<? super java.lang.Object[],? extends R> combinator,
Mono<?>... monos)
Merge given monos into a new Mono that will be fulfilled when all of the
given Monos have produced an item, aggregating their values according to
the provided combinator function and delaying errors.
|
static <R> Mono<R> |
zipDelayError(java.lang.Iterable<? extends Mono<?>> monos,
java.util.function.Function<? super java.lang.Object[],? extends R> combinator)
Aggregate given monos into a new Mono that will be fulfilled when all of the given Monos have produced an item.
|
static <T1,T2> Mono<Tuple2<T1,T2>> |
zipDelayError(Mono<? extends T1> p1,
Mono<? extends T2> p2)
将给定的 monos 合并到一个新的 Mono 中,当所有给定的 Monos 都产生了一个元素时,它将完成,将它们的值聚合到一个Tuple2并延迟错误。
|
static <T1,T2,T3> Mono<Tuple3<T1,T2,T3>> |
zipDelayError(Mono<? extends T1> p1,
Mono<? extends T2> p2,
Mono<? extends T3> p3)
Merge given monos into a new Mono that will be fulfilled when all of the given Mono Monos
have produced an item, aggregating their values into a
Tuple3 and delaying errors. |
static <T1,T2,T3,T4> |
zipDelayError(Mono<? extends T1> p1,
Mono<? extends T2> p2,
Mono<? extends T3> p3,
Mono<? extends T4> p4)
Merge given monos into a new Mono that will be fulfilled when all of the given Monos
have produced an item, aggregating their values into a
Tuple4 and delaying errors. |
static <T1,T2,T3,T4,T5> |
zipDelayError(Mono<? extends T1> p1,
Mono<? extends T2> p2,
Mono<? extends T3> p3,
Mono<? extends T4> p4,
Mono<? extends T5> p5)
Merge given monos into a new Mono that will be fulfilled when all of the given Monos
have produced an item, aggregating their values into a
Tuple5 and delaying errors. |
static <T1,T2,T3,T4,T5,T6> |
zipDelayError(Mono<? extends T1> p1,
Mono<? extends T2> p2,
Mono<? extends T3> p3,
Mono<? extends T4> p4,
Mono<? extends T5> p5,
Mono<? extends T6> p6)
Merge given monos into a new Mono that will be fulfilled when all of the given Monos
have produced an item, aggregating their values into a
Tuple6 and delaying errors. |
static <T1,T2,T3,T4,T5,T6,T7> |
zipDelayError(Mono<? extends T1> p1,
Mono<? extends T2> p2,
Mono<? extends T3> p3,
Mono<? extends T4> p4,
Mono<? extends T5> p5,
Mono<? extends T6> p6,
Mono<? extends T7> p7)
Merge given monos into a new Mono that will be fulfilled when all of the given Monos
have produced an item, aggregating their values into a
Tuple7 and delaying errors. |
static <T1,T2,T3,T4,T5,T6,T7,T8> |
zipDelayError(Mono<? extends T1> p1,
Mono<? extends T2> p2,
Mono<? extends T3> p3,
Mono<? extends T4> p4,
Mono<? extends T5> p5,
Mono<? extends T6> p6,
Mono<? extends T7> p7,
Mono<? extends T8> p8)
Merge given monos into a new Mono that will be fulfilled when all of the given Monos
have produced an item, aggregating their values into a
Tuple8 and delaying errors. |
<T2> Mono<Tuple2<T,T2>> |
zipWhen(java.util.function.Function<T,Mono<? extends T2>> rightGenerator)
Wait for the result from this mono, use it to create a second mono via the
provided
rightGenerator function and combine both results into a Tuple2 . |
<T2,O> Mono<O> |
zipWhen(java.util.function.Function<T,Mono<? extends T2>> rightGenerator,
java.util.function.BiFunction<T,T2,O> combinator)
Wait for the result from this mono, use it to create a second mono via the
provided
rightGenerator function and combine both results into an arbitrary
O object, as defined by the provided combinator function. |
<T2> Mono<Tuple2<T,T2>> |
zipWith(Mono<? extends T2> other)
Combine the result from this mono and another into a
Tuple2 . |
<T2,O> Mono<O> |
zipWith(Mono<? extends T2> other,
java.util.function.BiFunction<? super T,? super T2,? extends O> combinator)
Combine the result from this mono and another into an arbitrary
O object,
as defined by the provided combinator function. |
public static <T> Mono<T> create(java.util.function.Consumer<MonoSink<T>> callback)
MonoSink
创建一个Mono
Creates a deferred emitter that can be used with callback-based APIs to signal at most one value, a complete or an error signal.
Bridging legacy API involves mostly boilerplate code due to the lack of standard types and methods. There are two kinds of API surfaces: 1) addListener/removeListener and 2) callback-handler.
1) addListener/removeListener pairs
To work with such API one has to instantiate the listener,
call the sink from the listener then register it with the source:
Mono.<String>create(sink -> {
HttpListener listener = event -> {
if (event.getResponseCode() >= 400) {
sink.error(new RuntimeException("Failed"));
} else {
String body = event.getBody();
if (body.isEmpty()) {
sink.success();
} else {
sink.success(body.toLowerCase());
}
}
};
client.addListener(listener);
sink.onDispose(() -> client.removeListener(listener));
});
Note that this works only with single-value emitting listeners. Otherwise,
all subsequent signals are dropped. You may have to add client.removeListener(this);
to the listener's body.
2) callback handler
This requires a similar instantiation pattern such as above, but usually the
successful completion and error are separated into different methods.
In addition, the legacy API may or may not support some cancellation mechanism.
Mono.<String>create(sink -> {
Callback<String> callback = new Callback<String>() {
@Override
public void onResult(String data) {
sink.success(data.toLowerCase());
}
@Override
public void onError(Exception e) {
sink.error(e);
}
}
// without cancellation support:
client.call("query", callback);
// with cancellation support:
AutoCloseable cancel = client.call("query", callback);
sink.onDispose(() -> {
try {
cancel.close();
} catch (Exception ex) {
Exceptions.onErrorDropped(ex);
}
});
});
public static <T> Mono<T> defer(java.util.function.Supplier<? extends Mono<? extends T>> supplier)
just(Object)
则是饿汉式创建)
Create a Mono
provider that will supply
a target Mono
to subscribe to for
each Subscriber
downstream.
T
- the element type of the returned Mono instancesupplier
- a Mono
factoryMono
deferContextual(Function)
public static <T> Mono<T> deferContextual(java.util.function.Function<ContextView,? extends Mono<? extends T>> contextualMonoFactory)
contextWrite(ContextView)
则取距离最近的上下文
Create a Mono
provider that will supply
a target Mono
to subscribe to for each Subscriber
downstream.
This operator behaves the same way as defer(Supplier)
,
but accepts a Function
that will receive the current ContextView
as an argument.
public static Mono<java.lang.Long> delay(java.time.Duration duration)
Create a Mono which delays an onNext signal by a given duration
on a default Scheduler and completes.
If the demand cannot be produced in time, an onError will be signalled instead.
The delay is introduced through the parallel
default Scheduler.
duration
- the duration of the delayMono
public static Mono<java.lang.Long> delay(java.time.Duration duration, Scheduler timer)
Create a Mono which delays an onNext signal by a given duration
on a provided Scheduler
and completes.
If the demand cannot be produced in time, an onError will be signalled instead.
public static <T> Mono<T> empty()
Create a Mono
that completes without emitting any item.
T
- the reified Subscriber
typeMono
public static <T> Mono<T> error(java.lang.Throwable error)
Create a Mono
that terminates with the specified error immediately after
being subscribed to.
T
- the reified Subscriber
typeerror
- the onError signalMono
public static <T> Mono<T> error(java.util.function.Supplier<? extends java.lang.Throwable> errorSupplier)
Create a Mono
that terminates with an error immediately after being
subscribed to. The Throwable
is generated by a Supplier
, invoked
each time there is a subscription and allowing for lazy instantiation.
T
- the reified Subscriber
typeerrorSupplier
- the error signal Supplier
to invoke for each Subscriber
Mono
@SafeVarargs @Deprecated public static <T> Mono<T> first(Mono<? extends T>... monos)
firstWithSignal(Mono[])
. To be removed in reactor 3.5.Mono
to emit any signal (value, empty completion or error)
and replay that signal, effectively behaving like the fastest of these competing
sources.
T
- The type of the function result.monos
- The deferred monos to use.Mono
behaving like the fastest of its sources.@Deprecated public static <T> Mono<T> first(java.lang.Iterable<? extends Mono<? extends T>> monos)
firstWithSignal(Iterable)
. To be removed in reactor 3.5.Mono
to emit any signal (value, empty completion or error)
and replay that signal, effectively behaving like the fastest of these competing
sources.
T
- The type of the function result.monos
- The deferred monos to use.Mono
behaving like the fastest of its sources.@SafeVarargs public static <T> Mono<T> firstWithSignal(Mono<? extends T>... monos)
Pick the first Mono
to emit any signal (value, empty completion or error)
and replay that signal, effectively behaving like the fastest of these competing
sources.
T
- The type of the function result.monos
- The deferred monos to use.Mono
behaving like the fastest of its sources.public static <T> Mono<T> firstWithSignal(java.lang.Iterable<? extends Mono<? extends T>> monos)
Pick the first Mono
to emit any signal (value, empty completion or error)
and replay that signal, effectively behaving like the fastest of these competing
sources.
T
- The type of the function result.monos
- The deferred monos to use.Mono
behaving like the fastest of its sources.public static <T> Mono<T> firstWithValue(java.lang.Iterable<? extends Mono<? extends T>> monos)
Pick the first Mono
source to emit any value and replay that signal,
effectively behaving like the source that first emits an
onNext
.
Valued sources always "win" over an empty source (one that only emits onComplete) or a failing source (one that only emits onError).
When no source can provide a value, this operator fails with a NoSuchElementException
(provided there are at least two sources). This exception has a composite
as its cause
that can be used to inspect what went wrong with each source
(so the composite has as many elements as there are sources).
Exceptions from failing sources are directly reflected in the composite at the index of the failing source.
For empty sources, a NoSuchElementException
is added at their respective index.
One can use Exceptions.unwrapMultiple(topLevel.getCause())
to easily inspect these errors as a List
.
Note that like in firstWithSignal(Iterable)
, an infinite source can be problematic
if no other source emits onNext.
T
- The type of the element in the sources and the resulting monomonos
- An Iterable
of the competing source monosMono
behaving like the fastest of its sources@SafeVarargs public static <T> Mono<T> firstWithValue(Mono<? extends T> first, Mono<? extends T>... others)
Pick the first Mono
source to emit any value and replay that signal,
effectively behaving like the source that first emits an
onNext
.
Valued sources always "win" over an empty source (one that only emits onComplete) or a failing source (one that only emits onError).
When no source can provide a value, this operator fails with a NoSuchElementException
(provided there are at least two sources). This exception has a composite
as its cause
that can be used to inspect what went wrong with each source
(so the composite has as many elements as there are sources).
Exceptions from failing sources are directly reflected in the composite at the index of the failing source.
For empty sources, a NoSuchElementException
is added at their respective index.
One can use Exceptions.unwrapMultiple(topLevel.getCause())
to easily inspect these errors as a List
.
Note that like in firstWithSignal(Mono[])
, an infinite source can be problematic
if no other source emits onNext.
In case the first
source is already an array-based firstWithValue(Mono, Mono[])
instance, nesting is avoided: a single new array-based instance is created with all the
sources from first
plus all the others
sources at the same level.
public static <T> Mono<T> from(org.reactivestreams.Publisher<? extends T> source)
Publisher
指定为一个 Mono
的API
Expose the specified Publisher
with the Mono
API, and ensure it will emit 0 or 1 item.
The source emitter will be cancelled on the first `onNext`.
Hooks.onEachOperator(String, Function)
and similar assembly hooks are applied
unless the source is already a Mono
(including Mono
that was decorated as a Flux
,
see Flux.from(Publisher)
).
T
- the source typesource
- the Publisher
sourceMono
public static <T> Mono<T> fromCallable(java.util.concurrent.Callable<? extends T> supplier)
Callable
来创建一个Mono, 如果返回值为空,则相当于是 empty()
Create a Mono
producing its value using the provided Callable
. If
the Callable resolves to null
, the resulting Mono completes empty.
T
- type of the expected valuesupplier
- Callable
that will produce the valueMono
.public static <T> Mono<T> fromCompletionStage(java.util.concurrent.CompletionStage<? extends T> completionStage)
CompletionStage
创建一个Mono
Create a Mono
, producing its value using the provided CompletionStage
.
If the completionStage is also a Future
, cancelling the Mono will cancel the future.
Use fromFuture(CompletableFuture, boolean)
with suppressCancellation
set to
true
if you need to suppress cancellation propagation.
T
- type of the expected valuecompletionStage
- CompletionStage
that will produce a value (or a null to
complete immediately)Mono
.public static <T> Mono<T> fromCompletionStage(java.util.function.Supplier<? extends java.util.concurrent.CompletionStage<? extends T>> stageSupplier)
CompletionStage
创建一个Mono
Create a Mono
that wraps a lazily-supplied CompletionStage
on subscription,
emitting the value produced by the CompletionStage
.
If the completionStage is also a Future
, cancelling the Mono will cancel the future.
Use fromFuture(CompletableFuture, boolean)
with suppressCancellation
set to
true
if you need to suppress cancellation propagation.
T
- type of the expected valuestageSupplier
- The Supplier
of a CompletionStage
that will produce a value (or a null to
complete immediately). This allows lazy triggering of CompletionStage-based APIs.Mono
.public static <I> Mono<I> fromDirect(org.reactivestreams.Publisher<? extends I> source)
Convert a Publisher
to a Mono
without any cardinality check
(ie this method doesn't cancel the source past the first element).
Conversion transparently returns Mono
sources without wrapping and otherwise
supports Fuseable
sources.
Note this is an advanced interoperability operator that implies you know the
Publisher
you are converting follows the Mono
semantics and only
ever emits one element.
Hooks.onEachOperator(String, Function)
and similar assembly hooks are applied
unless the source is already a Mono
.
I
- type of the value emitted by the publishersource
- the Mono-compatible Publisher
to wrapMono
public static <T> Mono<T> fromFuture(java.util.concurrent.CompletableFuture<? extends T> future)
CompletableFuture
创建一个Mono
Create a Mono
, producing its value using the provided CompletableFuture
and cancelling the future if the Mono gets cancelled.
Use fromFuture(CompletableFuture, boolean)
with suppressCancellation
set to
true
if you need to suppress cancellation propagation.
T
- type of the expected valuefuture
- CompletableFuture
that will produce a value (or a null to
complete immediately)Mono
.fromCompletionStage for a generalization
public static <T> Mono<T> fromFuture(java.util.concurrent.CompletableFuture<? extends T> future, boolean suppressCancel)
CompletableFuture
创建一个Mono, 当Mono被取消订阅时可以选自是否取消 CompletableFuture
Create a Mono
, producing its value using the provided CompletableFuture
and optionally cancelling the future if the Mono gets cancelled (if suppressCancel == false
).
T
- type of the expected valuefuture
- CompletableFuture
that will produce a value (or a null to complete immediately)suppressCancel
- true
to prevent cancellation of the future when the Mono is cancelled,
false
otherwise (the default)Mono
.public static <T> Mono<T> fromFuture(java.util.function.Supplier<? extends java.util.concurrent.CompletableFuture<? extends T>> futureSupplier)
CompletableFuture
懒加载创建一个Mono
Create a Mono
that wraps a lazily-supplied CompletableFuture
on subscription,
emitting the value produced by the future and cancelling the future if the Mono gets cancelled.
T
- type of the expected valuefutureSupplier
- The Supplier
of a CompletableFuture
that will produce a value
(or a null to complete immediately). This allows lazy triggering of future-based APIs.Mono
.fromCompletionStage for a generalization
public static <T> Mono<T> fromFuture(java.util.function.Supplier<? extends java.util.concurrent.CompletableFuture<? extends T>> futureSupplier, boolean suppressCancel)
Create a Mono
that wraps a lazily-supplied CompletableFuture
on subscription,
emitting the value produced by the future and optionally cancelling the future if the Mono gets cancelled
(if suppressCancel == false
).
T
- type of the expected valuefutureSupplier
- The Supplier
of a CompletableFuture
that will produce a value
(or a null to complete immediately). This allows lazy triggering of future-based APIs.suppressCancel
- true
to prevent cancellation of the future when the Mono is cancelled,
false
otherwise (the default)Mono
.fromCompletionStage for a generalization
public static <T> Mono<T> fromRunnable(java.lang.Runnable runnable)
Mono
,一旦提供的 Runnable
被执行完成,Mono
就会被置空
Create a Mono
that completes empty once the provided Runnable
has
been executed.
T
- The generic type of the upstream, which is preserved by this operatorrunnable
- Runnable
that will be executed before emitting the completion signalMono
.public static <T> Mono<T> fromSupplier(java.util.function.Supplier<? extends T> supplier)
Supplier
创建一个Mono
Create a Mono
, producing its value using the provided Supplier
. If
the Supplier resolves to null
, the resulting Mono completes empty.
T
- type of the expected valuesupplier
- Supplier
that will produce the valueMono
.public static <T> Mono<T> ignoreElements(org.reactivestreams.Publisher<T> source)
Create a new Mono
that ignores elements from the source (dropping them),
but completes when the source completes.
Discard Support: This operator discards the element from the source.
T
- the source type of the ignored datasource
- the Publisher
to ignoreMono
.public static <T> Mono<T> just(T data)
Create a new Mono
that emits the specified item, which is captured at
instantiation time.
T
- the type of the produced itemdata
- the only item to onNextMono
.public static <T> Mono<T> justOrEmpty(@Nullable java.util.Optional<? extends T> data)
Optional
创建一个Mono,如果值存在就返回一个有元素的Mono,否则就返回一个 empty()
Create a new Mono
that emits the specified item if Optional.isPresent()
otherwise only emits
onComplete.
T
- the type of the produced itemdata
- the Optional
item to onNext or onComplete if not presentMono
.public static <T> Mono<T> justOrEmpty(@Nullable T data)
empty()
Create a new Mono
that emits the specified item if non null otherwise only emits
onComplete.
T
- the type of the produced itemdata
- the item to onNext or onComplete if nullMono
.public static <T> Mono<T> never()
Mono
,本质上是无限期地运行
Return a Mono
that will never signal any data, error or completion signal,
essentially running indefinitely.
T
- the Subscriber
type targetMono
public static <T> Mono<java.lang.Boolean> sequenceEqual(org.reactivestreams.Publisher<? extends T> source1, org.reactivestreams.Publisher<? extends T> source2)
Returns a Mono that emits a Boolean value that indicates whether two Publisher sequences are the same by comparing the items emitted by each Publisher pairwise.
T
- the type of items emitted by each Publishersource1
- the first Publisher to comparesource2
- the second Publisher to comparepublic static <T> Mono<java.lang.Boolean> sequenceEqual(org.reactivestreams.Publisher<? extends T> source1, org.reactivestreams.Publisher<? extends T> source2, java.util.function.BiPredicate<? super T,? super T> isEqual)
Returns a Mono that emits a Boolean value that indicates whether two Publisher sequences are the same by comparing the items emitted by each Publisher pairwise based on the results of a specified equality function.
T
- the type of items emitted by each Publishersource1
- the first Publisher to comparesource2
- the second Publisher to compareisEqual
- a function used to compare items emitted by each Publisherpublic static <T> Mono<java.lang.Boolean> sequenceEqual(org.reactivestreams.Publisher<? extends T> source1, org.reactivestreams.Publisher<? extends T> source2, java.util.function.BiPredicate<? super T,? super T> isEqual, int prefetch)
Returns a Mono that emits a Boolean value that indicates whether two Publisher sequences are the same by comparing the items emitted by each Publisher pairwise based on the results of a specified equality function.
T
- the type of items emitted by each Publishersource1
- the first Publisher to comparesource2
- the second Publisher to compareisEqual
- a function used to compare items emitted by each Publisherprefetch
- the number of items to prefetch from the first and second source Publisherpublic static <T,D> Mono<T> using(java.util.concurrent.Callable<? extends D> resourceSupplier, java.util.function.Function<? super D,? extends Mono<? extends T>> sourceSupplier, java.util.function.Consumer<? super D> resourceCleanup, boolean eager)
Flux
,
in the case of a valued Mono
the cleanup happens just before passing the value to downstream.
In all cases, exceptions raised by the eager cleanup Consumer
may override the terminal event,
discarding the element if the derived Mono
was valued.
T
- emitted typeD
- resource typeresourceSupplier
- a Callable
that is called on subscribe to create the resourcesourceSupplier
- a Mono
factory to create the Mono depending on the created resourceresourceCleanup
- invoked on completion to clean-up the resourceeager
- set to true to clean before any signal (including onNext) is passed downstreamMono
public static <T,D> Mono<T> using(java.util.concurrent.Callable<? extends D> resourceSupplier, java.util.function.Function<? super D,? extends Mono<? extends T>> sourceSupplier, java.util.function.Consumer<? super D> resourceCleanup)
Uses a resource, generated by a supplier for each individual Subscriber, while streaming the value from a Mono derived from the same resource and makes sure the resource is released if the sequence terminates or the Subscriber cancels.
Unlike in Flux
, in the case of a valued Mono
the cleanup
happens just before passing the value to downstream. In all cases, exceptions raised by the cleanup
Consumer
may override the terminal event, discarding the element if the derived Mono
was valued.
public static <T,D> Mono<T> usingWhen(org.reactivestreams.Publisher<D> resourceSupplier, java.util.function.Function<? super D,? extends Mono<? extends T>> resourceClosure, java.util.function.Function<? super D,? extends org.reactivestreams.Publisher<?>> asyncCleanup)
Publisher
for each individual Subscriber
,
to derive a Mono
. Note that all steps of the operator chain that would need the
resource to be in an open stable state need to be described inside the resourceClosure
Function
.
Unlike in the Flux counterpart
, ALL signals are deferred
until the Mono
terminates and the relevant Function
generates and invokes a "cleanup"
Publisher
. This is because a failure in the cleanup Publisher
must result in a lone onError
signal in the downstream Mono
(any potential value in the
derived Mono
is discarded). Here are the various scenarios that can play out:
onComplete()
: downstream receives onComplete()
onError(t)
: downstream receives onError(t)
onComplete()
: downstream receives onNext(value),onComplete()
onError(t)
: downstream receives onError(t)
, value
is discardedonComplete()
: downstream receives onError(e)
onError(t)
: downstream receives onError(t)
, t suppressing e
Note that if the resource supplying Publisher
emits more than one resource, the
subsequent resources are dropped (Operators.onNextDropped(Object, Context)
). If
the publisher errors AFTER having emitted one resource, the error is also silently dropped
(Operators.onErrorDropped(Throwable, Context)
).
An empty completion or error without at least one onNext signal (no resource supplied)
triggers a short-circuit of the main sequence with the same terminal signal
(no cleanup is invoked).
Discard Support: This operator discards any source element if the asyncCleanup
handler fails.
T
- the type of elements emitted by the resource closure, and thus the main sequenceD
- the type of the resource objectresourceSupplier
- a Publisher
that "generates" the resource,
subscribed for each subscription to the main sequenceresourceClosure
- a factory to derive a Mono
from the supplied resourceasyncCleanup
- an asynchronous resource cleanup invoked when the resource
closure terminates (with onComplete, onError or cancel)Mono
built around a "transactional" resource, with deferred emission until the
asynchronous cleanup sequence completespublic static <T,D> Mono<T> usingWhen(org.reactivestreams.Publisher<D> resourceSupplier, java.util.function.Function<? super D,? extends Mono<? extends T>> resourceClosure, java.util.function.Function<? super D,? extends org.reactivestreams.Publisher<?>> asyncComplete, java.util.function.BiFunction<? super D,? super java.lang.Throwable,? extends org.reactivestreams.Publisher<?>> asyncError, java.util.function.Function<? super D,? extends org.reactivestreams.Publisher<?>> asyncCancel)
Uses a resource, generated by a Publisher
for each individual Subscriber
,
to derive a Mono
.Note that all steps of the operator chain that would need the
resource to be in an open stable state need to be described inside the resourceClosure
Function
.
Unlike in the Flux counterpart
,
ALL signals are deferred until the Mono
terminates and the relevant Function
generates and invokes a "cleanup" Publisher
. This is because a failure in the cleanup Publisher
must result in a lone onError
signal in the downstream Mono
(any potential value in the
derived Mono
is discarded). Here are the various scenarios that can play out:
onComplete()
: downstream receives onComplete()
onError(t)
: downstream receives onError(t)
onComplete()
: downstream receives onNext(value),onComplete()
onError(t)
: downstream receives onError(t)
, value
is discardedonComplete()
: downstream receives onError(e)
onError(t)
: downstream receives onError(t)
, t suppressing e
Individual cleanups can also be associated with mono cancellation and error terminations:
Note that if the resource supplying Publisher
emits more than one resource, the
subsequent resources are dropped (Operators.onNextDropped(Object, Context)
). If
the publisher errors AFTER having emitted one resource, the error is also silently dropped
(Operators.onErrorDropped(Throwable, Context)
).
An empty completion or error without at least one onNext signal (no resource supplied)
triggers a short-circuit of the main sequence with the same terminal signal
(no cleanup is invoked).
Discard Support: This operator discards the element if the asyncComplete
handler fails.
T
- the type of elements emitted by the resource closure, and thus the main sequenceD
- the type of the resource objectresourceSupplier
- a Publisher
that "generates" the resource,
subscribed for each subscription to the main sequenceresourceClosure
- a factory to derive a Mono
from the supplied resourceasyncComplete
- an asynchronous resource cleanup invoked if the resource closure terminates with onCompleteasyncError
- an asynchronous resource cleanup invoked if the resource closure terminates with onError.
The terminating error is provided to the BiFunction
asyncCancel
- an asynchronous resource cleanup invoked if the resource closure is cancelled.
When null
, the asyncComplete
path is used instead.Mono
built around a "transactional" resource, with several
termination path triggering asynchronous cleanup sequencespublic static Mono<java.lang.Void> when(org.reactivestreams.Publisher<?>... sources)
Aggregate given publishers into a new Mono that will be fulfilled
when all of the given sources have completed. An error will cause
pending results to be cancelled and immediate error emission to the returned Mono
.
sources
- The sources to use.Mono
.public static Mono<java.lang.Void> when(java.lang.Iterable<? extends org.reactivestreams.Publisher<?>> sources)
Aggregate given publishers into a new Mono that will be
fulfilled when all of the given Publishers have completed.
An error will cause pending results to be cancelled and immediate error emission
to the returned Mono
.
sources
- The sources to use.Mono
.public static Mono<java.lang.Void> whenDelayError(java.lang.Iterable<? extends org.reactivestreams.Publisher<?>> sources)
Aggregate given publishers into a new Mono that will be fulfilled when all of the given sources have completed. Errors from the sources are delayed. If several Publishers error, the exceptions are combined (as suppressed exceptions on a root exception).
sources
- The sources to use.Mono
.public static Mono<java.lang.Void> whenDelayError(org.reactivestreams.Publisher<?>... sources)
Merge given publishers into a new Mono that will be fulfilled when all of the given sources have completed. Errors from the sources are delayed. If several Publishers error, the exceptions are combined (as suppressed exceptions on a root exception).
sources
- The sources to use.Mono
.public static <T1,T2> Mono<Tuple2<T1,T2>> zip(Mono<? extends T1> p1, Mono<? extends T2> p2)
Merge given monos into a new Mono that will be fulfilled when all of the given Monos
have produced an item, aggregating their values into a Tuple2
.
An error or empty completion of any source will cause other sources
to be cancelled and the resulting Mono to immediately error or complete, respectively.
T1
- type of the value from p1T2
- type of the value from p2p1
- The first upstream Publisher
to subscribe to.p2
- The second upstream Publisher
to subscribe to.Mono
.public static <T1,T2,O> Mono<O> zip(Mono<? extends T1> p1, Mono<? extends T2> p2, java.util.function.BiFunction<? super T1,? super T2,? extends O> combinator)
Merge given monos into a new Mono that will be fulfilled when all of the given Monos have produced an item, aggregating their values as defined by the combinator function. An error or empty completion of any source will cause other sources to be cancelled and the resulting Mono to immediately error or complete, respectively.
T1
- type of the value from p1T2
- type of the value from p2O
- output valuep1
- The first upstream Publisher
to subscribe to.p2
- The second upstream Publisher
to subscribe to.combinator
- a BiFunction
combinator function when both sources
completeMono
.public static <T1,T2,T3> Mono<Tuple3<T1,T2,T3>> zip(Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3)
Tuple3
.
An error or empty completion of any source will cause other sources
to be cancelled and the resulting Mono to immediately error or complete, respectively.
T1
- type of the value from p1T2
- type of the value from p2T3
- type of the value from p3p1
- The first upstream Publisher
to subscribe to.p2
- The second upstream Publisher
to subscribe to.p3
- The third upstream Publisher
to subscribe to.Mono
.public static <T1,T2,T3,T4> Mono<Tuple4<T1,T2,T3,T4>> zip(Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3, Mono<? extends T4> p4)
Tuple4
.
An error or empty completion of any source will cause other sources
to be cancelled and the resulting Mono to immediately error or complete, respectively.
T1
- type of the value from p1T2
- type of the value from p2T3
- type of the value from p3T4
- type of the value from p4p1
- The first upstream Publisher
to subscribe to.p2
- The second upstream Publisher
to subscribe to.p3
- The third upstream Publisher
to subscribe to.p4
- The fourth upstream Publisher
to subscribe to.Mono
.public static <T1,T2,T3,T4,T5> Mono<Tuple5<T1,T2,T3,T4,T5>> zip(Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3, Mono<? extends T4> p4, Mono<? extends T5> p5)
Tuple5
.
An error or empty completion of any source will cause other sources
to be cancelled and the resulting Mono to immediately error or complete, respectively.
T1
- type of the value from p1T2
- type of the value from p2T3
- type of the value from p3T4
- type of the value from p4T5
- type of the value from p5p1
- The first upstream Publisher
to subscribe to.p2
- The second upstream Publisher
to subscribe to.p3
- The third upstream Publisher
to subscribe to.p4
- The fourth upstream Publisher
to subscribe to.p5
- The fifth upstream Publisher
to subscribe to.Mono
.public static <T1,T2,T3,T4,T5,T6> Mono<Tuple6<T1,T2,T3,T4,T5,T6>> zip(Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3, Mono<? extends T4> p4, Mono<? extends T5> p5, Mono<? extends T6> p6)
Tuple6
.
An error or empty completion of any source will cause other sources
to be cancelled and the resulting Mono to immediately error or complete, respectively.
T1
- type of the value from p1T2
- type of the value from p2T3
- type of the value from p3T4
- type of the value from p4T5
- type of the value from p5T6
- type of the value from p6p1
- The first upstream Publisher
to subscribe to.p2
- The second upstream Publisher
to subscribe to.p3
- The third upstream Publisher
to subscribe to.p4
- The fourth upstream Publisher
to subscribe to.p5
- The fifth upstream Publisher
to subscribe to.p6
- The sixth upstream Publisher
to subscribe to.Mono
.public static <T1,T2,T3,T4,T5,T6,T7> Mono<Tuple7<T1,T2,T3,T4,T5,T6,T7>> zip(Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3, Mono<? extends T4> p4, Mono<? extends T5> p5, Mono<? extends T6> p6, Mono<? extends T7> p7)
Tuple7
.
An error or empty completion of any source will cause other sources
to be cancelled and the resulting Mono to immediately error or complete, respectively.
T1
- type of the value from p1T2
- type of the value from p2T3
- type of the value from p3T4
- type of the value from p4T5
- type of the value from p5T6
- type of the value from p6T7
- type of the value from p7p1
- The first upstream Publisher
to subscribe to.p2
- The second upstream Publisher
to subscribe to.p3
- The third upstream Publisher
to subscribe to.p4
- The fourth upstream Publisher
to subscribe to.p5
- The fifth upstream Publisher
to subscribe to.p6
- The sixth upstream Publisher
to subscribe to.p7
- The seventh upstream Publisher
to subscribe to.Mono
.public static <T1,T2,T3,T4,T5,T6,T7,T8> Mono<Tuple8<T1,T2,T3,T4,T5,T6,T7,T8>> zip(Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3, Mono<? extends T4> p4, Mono<? extends T5> p5, Mono<? extends T6> p6, Mono<? extends T7> p7, Mono<? extends T8> p8)
Tuple8
.
An error or empty completion of any source will cause other sources
to be cancelled and the resulting Mono to immediately error or complete, respectively.
T1
- type of the value from p1T2
- type of the value from p2T3
- type of the value from p3T4
- type of the value from p4T5
- type of the value from p5T6
- type of the value from p6T7
- type of the value from p7T8
- type of the value from p8p1
- The first upstream Publisher
to subscribe to.p2
- The second upstream Publisher
to subscribe to.p3
- The third upstream Publisher
to subscribe to.p4
- The fourth upstream Publisher
to subscribe to.p5
- The fifth upstream Publisher
to subscribe to.p6
- The sixth upstream Publisher
to subscribe to.p7
- The seventh upstream Publisher
to subscribe to.p8
- The eight upstream Publisher
to subscribe to.Mono
.public static <R> Mono<R> zip(java.lang.Iterable<? extends Mono<?>> monos, java.util.function.Function<? super java.lang.Object[],? extends R> combinator)
R
- the combined resultmonos
- The monos to use.combinator
- the function to transform the combined array into an arbitrary
object.Mono
.public static <R> Mono<R> zip(java.util.function.Function<? super java.lang.Object[],? extends R> combinator, Mono<?>... monos)
R
- the combined resultmonos
- The monos to use.combinator
- the function to transform the combined array into an arbitrary
object.Mono
.public static <T1,T2> Mono<Tuple2<T1,T2>> zipDelayError(Mono<? extends T1> p1, Mono<? extends T2> p2)
Merge given monos into a new Mono that will be fulfilled when all of the given Monos
have produced an item, aggregating their values into a Tuple2
and delaying errors.
If a Mono source completes without value, the other source is run to completion then the
resulting Mono
completes empty.
If both Monos error, the two exceptions are combined (as suppressed exceptions on a root exception).
T1
- type of the value from p1T2
- type of the value from p2p1
- The first upstream Publisher
to subscribe to.p2
- The second upstream Publisher
to subscribe to.Mono
.public static <T1,T2,T3> Mono<Tuple3<T1,T2,T3>> zipDelayError(Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3)
Tuple3
and delaying errors.
If a Mono source completes without value, all other sources are run to completion then
the resulting Mono
completes empty.
If several Monos error, their exceptions are combined (as suppressed exceptions on a root exception).
T1
- type of the value from p1T2
- type of the value from p2T3
- type of the value from p3p1
- The first upstream Publisher
to subscribe to.p2
- The second upstream Publisher
to subscribe to.p3
- The third upstream Publisher
to subscribe to.Mono
.public static <T1,T2,T3,T4> Mono<Tuple4<T1,T2,T3,T4>> zipDelayError(Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3, Mono<? extends T4> p4)
Tuple4
and delaying errors.
If a Mono source completes without value, all other sources are run to completion then
the resulting Mono
completes empty.
If several Monos error, their exceptions are combined (as suppressed exceptions on a root exception).
T1
- type of the value from p1T2
- type of the value from p2T3
- type of the value from p3T4
- type of the value from p4p1
- The first upstream Publisher
to subscribe to.p2
- The second upstream Publisher
to subscribe to.p3
- The third upstream Publisher
to subscribe to.p4
- The fourth upstream Publisher
to subscribe to.Mono
.public static <T1,T2,T3,T4,T5> Mono<Tuple5<T1,T2,T3,T4,T5>> zipDelayError(Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3, Mono<? extends T4> p4, Mono<? extends T5> p5)
Tuple5
and delaying errors.
If a Mono source completes without value, all other sources are run to completion then
the resulting Mono
completes empty.
If several Monos error, their exceptions are combined (as suppressed exceptions on a root exception).
T1
- type of the value from p1T2
- type of the value from p2T3
- type of the value from p3T4
- type of the value from p4T5
- type of the value from p5p1
- The first upstream Publisher
to subscribe to.p2
- The second upstream Publisher
to subscribe to.p3
- The third upstream Publisher
to subscribe to.p4
- The fourth upstream Publisher
to subscribe to.p5
- The fifth upstream Publisher
to subscribe to.Mono
.public static <T1,T2,T3,T4,T5,T6> Mono<Tuple6<T1,T2,T3,T4,T5,T6>> zipDelayError(Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3, Mono<? extends T4> p4, Mono<? extends T5> p5, Mono<? extends T6> p6)
Tuple6
and delaying errors.
If a Mono source completes without value, all other sources are run to completion then
the resulting Mono
completes empty.
If several Monos error, their exceptions are combined (as suppressed exceptions on a root exception).
T1
- type of the value from p1T2
- type of the value from p2T3
- type of the value from p3T4
- type of the value from p4T5
- type of the value from p5T6
- type of the value from p6p1
- The first upstream Publisher
to subscribe to.p2
- The second upstream Publisher
to subscribe to.p3
- The third upstream Publisher
to subscribe to.p4
- The fourth upstream Publisher
to subscribe to.p5
- The fifth upstream Publisher
to subscribe to.p6
- The sixth upstream Publisher
to subscribe to.Mono
.public static <T1,T2,T3,T4,T5,T6,T7> Mono<Tuple7<T1,T2,T3,T4,T5,T6,T7>> zipDelayError(Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3, Mono<? extends T4> p4, Mono<? extends T5> p5, Mono<? extends T6> p6, Mono<? extends T7> p7)
Tuple7
and delaying errors.
If a Mono source completes without value, all other sources are run to completion then
the resulting Mono
completes empty.
If several Monos error, their exceptions are combined (as suppressed exceptions on a root exception).
T1
- type of the value from p1T2
- type of the value from p2T3
- type of the value from p3T4
- type of the value from p4T5
- type of the value from p5T6
- type of the value from p6T7
- type of the value from p7p1
- The first upstream Publisher
to subscribe to.p2
- The second upstream Publisher
to subscribe to.p3
- The third upstream Publisher
to subscribe to.p4
- The fourth upstream Publisher
to subscribe to.p5
- The fifth upstream Publisher
to subscribe to.p6
- The sixth upstream Publisher
to subscribe to.p7
- The seventh upstream Publisher
to subscribe to.Mono
.public static <T1,T2,T3,T4,T5,T6,T7,T8> Mono<Tuple8<T1,T2,T3,T4,T5,T6,T7,T8>> zipDelayError(Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3, Mono<? extends T4> p4, Mono<? extends T5> p5, Mono<? extends T6> p6, Mono<? extends T7> p7, Mono<? extends T8> p8)
Tuple8
and delaying errors.
If a Mono source completes without value, all other sources are run to completion then
the resulting Mono
completes empty.
If several Monos error, their exceptions are combined (as suppressed exceptions on a root exception).
T1
- type of the value from p1T2
- type of the value from p2T3
- type of the value from p3T4
- type of the value from p4T5
- type of the value from p5T6
- type of the value from p6T7
- type of the value from p7T8
- type of the value from p8p1
- The first upstream Publisher
to subscribe to.p2
- The second upstream Publisher
to subscribe to.p3
- The third upstream Publisher
to subscribe to.p4
- The fourth upstream Publisher
to subscribe to.p5
- The fifth upstream Publisher
to subscribe to.p6
- The sixth upstream Publisher
to subscribe to.p7
- The seventh upstream Publisher
to subscribe to.p8
- The eight upstream Publisher
to subscribe to.Mono
.public static <R> Mono<R> zipDelayError(java.lang.Iterable<? extends Mono<?>> monos, java.util.function.Function<? super java.lang.Object[],? extends R> combinator)
Mono
completes empty.
If several Monos error, their exceptions are combined (as suppressed exceptions on a root exception).
R
- the combined resultmonos
- The monos to use.combinator
- the function to transform the combined array into an arbitrary
object.Mono
.public static <R> Mono<R> zipDelayError(java.util.function.Function<? super java.lang.Object[],? extends R> combinator, Mono<?>... monos)
Mono
completes empty.
If several Monos error, their exceptions are combined (as suppressed exceptions on a root exception).
R
- the combined resultmonos
- The monos to use.combinator
- the function to transform the combined array into an arbitrary
object.Mono
.public final <P> P as(java.util.function.Function<? super Mono<T>,P> transformer)
Transform this Mono
into a target type.
mono.as(Flux::from).subscribe()
P
- the returned instance typetransformer
- the Function
to immediately map this Mono
into a target typeMono
transformed to an instance of PtransformDeferred(Function) for a lazy transformation of Mono
public final Mono<java.lang.Void> and(org.reactivestreams.Publisher<?> other)
Join the termination signals from this mono and another source into the returned void mono
other
- the Publisher
to wait for
completewhen(org.reactivestreams.Publisher<?>...)
@Nullable public T block()
Subscribe to this Mono
and block indefinitely until a next signal is
received. Returns that value, or null if the Mono completes empty. In case the Mono
errors, the original exception is thrown (wrapped in a RuntimeException
if
it was a checked exception).
Note that each block() will trigger a new subscription: in other words, the result might miss signal from hot publishers.
@Nullable public T block(java.time.Duration timeout)
Subscribe to this Mono
and block until a next signal is
received or a timeout expires. Returns that value, or null if the Mono completes
empty. In case the Mono errors, the original exception is thrown (wrapped in a
RuntimeException
if it was a checked exception).
If the provided timeout expires, a RuntimeException
is thrown.
Note that each block() will trigger a new subscription: in other words, the result might miss signal from hot publishers.
timeout
- maximum time period to wait for before raising a RuntimeException
public java.util.Optional<T> blockOptional()
Subscribe to this Mono
and block indefinitely until a next signal is
received or the Mono completes empty. Returns an Optional
, which can be used
to replace the empty case with an Exception via Optional.orElseThrow(Supplier)
.
In case the Mono itself errors, the original exception is thrown (wrapped in a
RuntimeException
if it was a checked exception).
Note that each blockOptional() will trigger a new subscription: in other words, the result might miss signal from hot publishers.
public java.util.Optional<T> blockOptional(java.time.Duration timeout)
Subscribe to this Mono
and block until a next signal is
received, the Mono completes empty or a timeout expires. Returns an Optional
for the first two cases, which can be used to replace the empty case with an
Exception via Optional.orElseThrow(Supplier)
.
In case the Mono itself errors, the original exception is thrown (wrapped in a
RuntimeException
if it was a checked exception).
If the provided timeout expires, a RuntimeException
is thrown.
Note that each block() will trigger a new subscription: in other words, the result might miss signal from hot publishers.
timeout
- maximum time period to wait for before raising a RuntimeException
public final <E> Mono<E> cast(java.lang.Class<E> clazz)
Cast the current Mono
produced type into a target produced type.
public final Mono<T> cache()
Turn this Mono
into a hot source and cache last emitted signals for further Subscriber
.
Completion and Error will also be replayed.
Once the first subscription is made to this Mono
, the source is subscribed to and
the signal will be cached, indefinitely. This process cannot be cancelled.
In the face of multiple concurrent subscriptions, this operator ensures that only one subscription is made to the source.
Mono
public final Mono<T> cache(java.time.Duration ttl)
Duration
指定 Mono 缓存失效时间
Turn this Mono
into a hot source and cache last emitted signals for further
Subscriber
, with an expiry timeout.
Completion and Error will also be replayed until ttl
triggers in which case
the next Subscriber
will start over a new subscription.
Cache loading (ie. subscription to the source) is triggered atomically by the first subscription to an uninitialized or expired cache, which guarantees that a single cache load happens at a time (and other subscriptions will get notified of the newly cached value when it arrives).
Mono
public final Mono<T> cache(java.time.Duration ttl, Scheduler timer)
Turn this Mono
into a hot source and cache last emitted signals for further
Subscriber
, with an expiry timeout.
Completion and Error will also be replayed until ttl
triggers in which case
the next Subscriber
will start over a new subscription.
Cache loading (ie. subscription to the source) is triggered atomically by the first subscription to an uninitialized or expired cache, which guarantees that a single cache load happens at a time (and other subscriptions will get notified of the newly cached value when it arrives).
public final Mono<T> cache(java.util.function.Function<? super T,java.time.Duration> ttlForValue, java.util.function.Function<java.lang.Throwable,java.time.Duration> ttlForError, java.util.function.Supplier<java.time.Duration> ttlForEmpty)
Turn this Mono
into a hot source and cache last emitted signal for further
Subscriber
, with an expiry timeout (TTL) that depends on said signal.
A TTL of Long.MAX_VALUE
milliseconds is interpreted as indefinite caching of
the signal (no cache cleanup is scheduled, so the signal is retained as long as this
Mono
is not garbage collected).
Empty completion and Error will also be replayed according to their respective TTL,
so transient errors can be "retried" by letting the Function
return
Duration.ZERO
. Such a transient exception would then be propagated to the first
subscriber but the following subscribers would trigger a new source subscription.
Exceptions in the TTL generators themselves are processed like the Duration.ZERO
case, except the original signal is suppressed
(in case of onError) or dropped
(in case of onNext).
Note that subscribers that come in perfectly simultaneously could receive the same cached signal even if the TTL is set to zero.
Cache loading (ie. subscription to the source) is triggered atomically by the first subscription to an uninitialized or expired cache, which guarantees that a single cache load happens at a time (and other subscriptions will get notified of the newly cached value when it arrives).
ttlForValue
- the TTL-generating Function
invoked when source is valuedttlForError
- the TTL-generating Function
invoked when source is erroringttlForEmpty
- the TTL-generating Supplier
invoked when source is emptyMono
public final Mono<T> cache(java.util.function.Function<? super T,java.time.Duration> ttlForValue, java.util.function.Function<java.lang.Throwable,java.time.Duration> ttlForError, java.util.function.Supplier<java.time.Duration> ttlForEmpty, Scheduler timer)
Turn this Mono
into a hot source and cache last emitted signal for further
Subscriber
, with an expiry timeout (TTL) that depends on said signal.
A TTL of Long.MAX_VALUE
milliseconds is interpreted as indefinite caching of
the signal (no cache cleanup is scheduled, so the signal is retained as long as this
Mono
is not garbage collected).
Empty completion and Error will also be replayed according to their respective TTL,
so transient errors can be "retried" by letting the Function
return
Duration.ZERO
. Such a transient exception would then be propagated to the first
subscriber but the following subscribers would trigger a new source subscription.
Exceptions in the TTL generators themselves are processed like the Duration.ZERO
case, except the original signal is suppressed
(in case of onError) or dropped
(in case of onNext).
Note that subscribers that come in perfectly simultaneously could receive the same cached signal even if the TTL is set to zero.
Cache loading (ie. subscription to the source) is triggered atomically by the first subscription to an uninitialized or expired cache, which guarantees that a single cache load happens at a time (and other subscriptions will get notified of the newly cached value when it arrives).
public final Mono<T> cacheInvalidateIf(java.util.function.Predicate<? super T> invalidationPredicate)
注意:这个方法不接受 empty()
Cache onNext
signal received from the source and replay it to other subscribers,
while allowing invalidation by verifying the cached value against the given Predicate
each time a late
subscription occurs.
Note that the Predicate
is only evaluated if the cache is currently populated, ie. it is not applied
upon receiving the source onNext
signal.
For late subscribers, if the predicate returns true
the cache is invalidated and a new subscription is made
to the source in an effort to refresh the cache with a more up-to-date value to be passed to the new subscriber.
The predicate is not strictly evaluated once per downstream subscriber. Rather, subscriptions happening in concurrent
batches will trigger a single evaluation of the predicate. Similarly, a batch of subscriptions happening before
the cache is populated (ie. before this operator receives an onNext signal after an invalidation) will always
receive the incoming value without going through the Predicate
. The predicate is only triggered by
subscribers that come in AFTER the cache is populated. Therefore, it is possible that pre-population subscribers
receive an "invalid" value, especially if the object can switch from a valid to an invalid state in a short amount
of time (eg. between creation, cache population and propagation to the downstream subscriber(s)).
If the cached value needs to be discarded in case of invalidation, the recommended way is to do so in the predicate directly. Note that some downstream subscribers might still be using or storing the value, for example if they haven't requested anything yet.
As this form of caching is explicitly value-oriented, empty source completion signals and error signals are NOT
cached. It is always possible to use materialize()
to cache these (further using filter(Predicate)
if one wants to only consider empty sources or error sources).
Predicate is applied differently depending on whether the cache is populated or not:
Cancellation is only possible for downstream subscribers when they've been added to a COORDINATOR. Subscribers that are received when POPULATED will either be completed right away or (if the predicate fails) end up being added to a COORDINATOR.
When cancelling a COORDINATOR-issued subscription:
The fact that COORDINATOR cancels its source when no more subscribers remain is important, because it prevents issues with a never() source or a source that never produces a value passing the predicate (assuming timeouts on the subscriber).
invalidationPredicate
- the Predicate
used for cache invalidation. Returning true
means the value is invalid and should be
removed from the cache.Mono
which can be invalidatedpublic final Mono<T> cacheInvalidateWhen(java.util.function.Function<? super T,Mono<java.lang.Void>> invalidationTriggerGenerator)
empty()
缓存失效; never()
缓存生效; 或者抛出异常
Cache onNext
signal received from the source and replay it to other subscribers,
while allowing invalidation via a Mono<Void>
companion trigger generated from the currently
cached value.
As this form of caching is explicitly value-oriented, empty source completion signals and error signals are NOT
cached. It is always possible to use materialize()
to cache these (further using filter(Predicate)
if one wants to only consider empty sources or error sources). The exception is still propagated to the subscribers
that have accumulated between the time the source has been subscribed to and the time the onError/onComplete terminal
signal is received. An empty source is turned into a NoSuchElementException
onError.
Completion of the trigger will invalidate the cached element, so the next subscriber that comes in will trigger a new subscription to the source, re-populating the cache and re-creating a new trigger out of that value.
never()
), the source subscription is cancelled.
If the cached value needs to be discarded in case of invalidation, use the cacheInvalidateWhen(Function, Consumer)
version.
Note that some downstream subscribers might still be using or storing the value, for example if they
haven't requested anything yet.
Trigger is generated only after a subscribers in the COORDINATOR have received the value, and only once. The only way to get out of the POPULATED state is to use the trigger, so there cannot be multiple trigger subscriptions, nor concurrent triggering.
Cancellation is only possible for downstream subscribers when they've been added to a COORDINATOR. Subscribers that are received when POPULATED will either be completed right away or (if the predicate fails) end up being added to a COORDINATOR.
When cancelling a COORDINATOR-issued subscription:
The fact that COORDINATOR cancels its source when no more subscribers remain is important, because it prevents issues with a never() source or a source that never produces a value passing the predicate (assuming timeouts on the subscriber).
invalidationTriggerGenerator
- the Function
that generates new Mono<Void>
triggers
used for invalidationMono
which can be invalidatedpublic final Mono<T> cacheInvalidateWhen(java.util.function.Function<? super T,Mono<java.lang.Void>> invalidationTriggerGenerator, java.util.function.Consumer<? super T> onInvalidate)
Cache onNext
signal received from the source and replay it to other subscribers,
while allowing invalidation via a Mono<Void>
companion trigger generated from the currently
cached value.
As this form of caching is explicitly value-oriented, empty source completion signals and error signals are NOT
cached. It is always possible to use materialize()
to cache these (further using filter(Predicate)
if one wants to only consider empty sources or error sources). The exception is still propagated to the subscribers
that have accumulated between the time the source has been subscribed to and the time the onError/onComplete terminal
signal is received. An empty source is turned into a NoSuchElementException
onError.
Completion of the trigger will invalidate the cached element, so the next subscriber that comes in will trigger a new subscription to the source, re-populating the cache and re-creating a new trigger out of that value.
never()
), the source subscription is cancelled.
Once a cached value is invalidated, it is passed to the provided Consumer
(which MUST complete normally).
Note that some downstream subscribers might still be using or storing the value, for example if they
haven't requested anything yet.
Trigger is generated only after a subscribers in the COORDINATOR have received the value, and only once. The only way to get out of the POPULATED state is to use the trigger, so there cannot be multiple trigger subscriptions, nor concurrent triggering.
Cancellation is only possible for downstream subscribers when they've been added to a COORDINATOR. Subscribers that are received when POPULATED will either be completed right away or (if the predicate fails) end up being added to a COORDINATOR.
When cancelling a COORDINATOR-issued subscription:
The fact that COORDINATOR cancels its source when no more subscribers remain is important, because it prevents issues with a never() source or a source that never produces a value passing the predicate (assuming timeouts on the subscriber).
invalidationTriggerGenerator
- the Function
that generates new Mono<Void>
triggers
used for invalidationonInvalidate
- the Consumer
that will be applied to cached value upon invalidationMono
which can be invalidatedpublic final Mono<T> checkpoint()
如果你知道问题出在哪个链上,但是由于这个链的上游或下游来自其他的调用,就可以针对这个链使用checkpoint()进行问题定位
Activate traceback (full assembly tracing) for this particular Mono
, in case of an error
upstream of the checkpoint. Tracing incurs the cost of an exception stack trace
creation.
It should be placed towards the end of the reactive chain, as errors triggered downstream of it cannot be observed and augmented with assembly trace.
The traceback is attached to the error as a suppressed exception
.
As such, if the error is a composite one
, the traceback
would appear as a component of the composite. In any case, the traceback nature can be detected via
Exceptions.isTraceback(Throwable)
.
Mono
public final Mono<T> checkpoint(java.lang.String description)
Activate traceback (assembly marker) for this particular Mono
by giving it a description that
will be reflected in the assembly traceback in case of an error upstream of the
checkpoint. Note that unlike checkpoint()
, this doesn't create a
filled stack trace, avoiding the main cost of the operator.
However, as a trade-off the description must be unique enough for the user to find
out where this Mono was assembled. If you only want a generic description, and
still rely on the stack trace to find the assembly site, use the
checkpoint(String, boolean)
variant.
It should be placed towards the end of the reactive chain, as errors triggered downstream of it cannot be observed and augmented with assembly trace.
The traceback is attached to the error as a suppressed exception
.
As such, if the error is a composite one
, the traceback
would appear as a component of the composite. In any case, the traceback nature can be detected via
Exceptions.isTraceback(Throwable)
.
description
- a unique enough description to include in the light assembly traceback.Mono
public final Mono<T> checkpoint(@Nullable java.lang.String description, boolean forceStackTrace)
forceStackTrace
option).
By setting the forceStackTrace
parameter to true, activate assembly
tracing for this particular Mono
and give it a description that
will be reflected in the assembly traceback in case of an error upstream of the
checkpoint. Note that unlike checkpoint(String)
, this will incur
the cost of an exception stack trace creation. The description could for
example be a meaningful name for the assembled mono or a wider correlation ID,
since the stack trace will always provide enough information to locate where this
Flux was assembled.
By setting forceStackTrace
to false, behaves like
checkpoint(String)
and is subject to the same caveat in choosing the
description.
It should be placed towards the end of the reactive chain, as errors triggered downstream of it cannot be observed and augmented with assembly marker.
The traceback is attached to the error as a suppressed exception
.
As such, if the error is a composite one
, the traceback
would appear as a component of the composite. In any case, the traceback nature can be detected via
Exceptions.isTraceback(Throwable)
.
description
- a description (must be unique enough if forceStackTrace is set
to false).forceStackTrace
- false to make a light checkpoint without a stacktrace, true
to use a stack trace.Mono
.public final Flux<T> concatWith(org.reactivestreams.Publisher<? extends T> other)
Concatenate emissions of this Mono
with the provided Publisher
(no interleave).
public final Mono<T> contextCapture()
If context-propagation library
is on the classpath, this is a convenience shortcut to capture thread local values during the
subscription phase and put them in the Context
that is visible upstream of this operator.
As a result this operator should generally be used as close as possible to the end of the chain / subscription point.
If the ContextView
visible upstream is not empty, a small subset of operators will automatically
restore the context snapshot (handle
, tap
).
If context-propagation is not available at runtime, this operator simply returns the current Mono
instance.
Flux
where context-propagation API has been used to capture entries and
inject them into the Context
handle(BiConsumer)
,
tap(SignalListenerFactory)
public final Mono<T> contextWrite(ContextView contextToAppend)
ContextView
和下游的 ContextView
进行合并
Enrich the Context
visible from downstream for the benefit of upstream
operators, by making all values from the provided ContextView
visible on top
of pairs from downstream.
A Context
(and its ContextView
) is tied to a given subscription
and is read by querying the downstream Subscriber
. Subscriber
that
don't enrich the context instead access their own downstream's context. As a result,
this operator conceptually enriches a Context
coming from under it in the chain
(downstream, by default an empty one) and makes the new enriched Context
visible to operators above it in the chain.
contextToAppend
- the ContextView
to merge with the downstream Context
,
resulting in a new more complete Context
that will be visible from upstream.Mono
ContextView
public final Mono<T> contextWrite(java.util.function.Function<Context,Context> contextModifier)
Enrich the Context
visible from downstream for the benefit of upstream
operators, by applying a Function
to the downstream Context
.
The Function
takes a Context
for convenience, allowing to easily
call write APIs
to return a new Context
.
A Context
(and its ContextView
) is tied to a given subscription
and is read by querying the downstream Subscriber
. Subscriber
that
don't enrich the context instead access their own downstream's context. As a result,
this operator conceptually enriches a Context
coming from under it in the chain
(downstream, by default an empty one) and makes the new enriched Context
visible to operators above it in the chain.
public final Mono<T> defaultIfEmpty(T defaultV)
defaultV
- the alternate value if this sequence is emptyMono
Flux.defaultIfEmpty(Object)
public final Mono<T> delayElement(java.time.Duration delay)
Delay this Mono
element (Subscriber.onNext(T)
signal) by a given
duration. Empty Monos or error signals are not delayed.
Note that the scheduler on which the Mono chain continues execution will be the
parallel
scheduler if the mono is valued, or the
current scheduler if the mono completes empty or errors.
delay
- duration by which to delay the Subscriber.onNext(T)
signalMono
public final Mono<T> delayElement(java.time.Duration delay, Scheduler timer)
Mono
element (Subscriber.onNext(T)
signal) by a given
Duration
, on a particular Scheduler
. Empty monos or error signals are not delayed.
Note that the scheduler on which the mono chain continues execution will be the scheduler provided if the mono is valued, or the current scheduler if the mono completes empty or errors.
public final Mono<T> delayUntil(java.util.function.Function<? super T,? extends org.reactivestreams.Publisher<?>> triggerProvider)
Subscribe to this Mono
and another Publisher
that is generated from
this Mono's element and which will be used as a trigger for relaying said element.
That is to say, the resulting Mono
delays until this Mono's element is
emitted, generates a trigger Publisher and then delays again until the trigger
Publisher terminates.
Note that contiguous calls to all delayUntil are fused together. The triggers are generated and subscribed to in sequence, once the previous trigger completes. Error is propagated immediately downstream. In both cases, an error in the source is immediately propagated.
triggerProvider
- a Function
that maps this Mono's value into a
Publisher
whose termination will trigger relaying the value.public final Mono<T> delaySubscription(java.time.Duration delay)
Delay the subscription
to this Mono
source until the given
period elapses.
public final <U> Mono<T> delaySubscription(org.reactivestreams.Publisher<U> subscriptionDelay)
U
- the other source typesubscriptionDelay
- a
Publisher
to signal by next or complete this subscribe(Subscriber)
Mono
public final <X> Mono<X> dematerialize()
Signal
信号量的 onNext、onError、onComplete时,可以使用这个方法让流消失,让订阅者订阅信号量
An operator working only if this Mono
emits onNext, onError or onComplete Signal
instances, transforming these materialized
signals into
real signals on the Subscriber
.
The error Signal
will trigger onError and complete Signal
will trigger
onComplete.
X
- the dematerialized typeMono
materialize()
public final Mono<T> doAfterTerminate(java.lang.Runnable afterTerminate)
Add behavior (side-effect) triggered after the Mono
terminates, either by
completing downstream successfully or with an error.
The relevant signal is propagated downstream, then the Runnable
is executed.
afterTerminate
- the callback to call after Subscriber.onComplete()
or Subscriber.onError(java.lang.Throwable)
Mono
public final Mono<T> doFirst(java.lang.Runnable onFirst)
注意,当多个doFirst(Runnable)操作符在操作符链的任何位置被使用时,它们的执行顺序与声明顺序相反(订阅信号是自底向上流动的,最终流到数据源)
Add behavior (side-effect) triggered before the Mono
is
subscribed to, which should be the first event after assembly time.
Note that when several doFirst(Runnable)
operators are used anywhere in a
chain of operators, their order of execution is reversed compared to the declaration
order (as subscribe signal flows backward, from the ultimate subscriber to the source
publisher):
Mono.just(1v)
.doFirst(() -> System.out.println("three"))
.doFirst(() -> System.out.println("two"))
.doFirst(() -> System.out.println("one"));
//would print one two three
In case the Runnable
throws an exception, said exception will be directly
propagated to the subscribing Subscriber
along with a no-op Subscription
,
similarly to what error(Throwable)
does. Otherwise, after the handler has
executed, the Subscriber
is directly subscribed to the original source
Mono
(this
).
This side-effect method provides stronger first guarantees compared to
doOnSubscribe(Consumer)
, which is triggered once the Subscription
has been set up and passed to the Subscriber
.
public final Mono<T> doFinally(java.util.function.Consumer<SignalType> onFinally)
SignalType
Add behavior triggering after the Mono
terminates for any reason,
including cancellation. The terminating event (SignalType.ON_COMPLETE
,
SignalType.ON_ERROR
and SignalType.CANCEL
) is passed to the consumer,
which is executed after the signal has been passed downstream.
Note that the fact that the signal is propagated downstream before the callback is
executed means that several doFinally in a row will be executed in
reverse order. If you want to assert the execution of the callback
please keep in mind that the Mono will complete before it is executed, so its
effect might not be visible immediately after eg. a block()
.
onFinally
- the callback to execute after a terminal signal (complete, error
or cancel)Mono
public final Mono<T> doOnCancel(java.lang.Runnable onCancel)
Runnable
,然后将取消信号向上游传播到源
Add behavior triggered when the Mono
is cancelled.
The handler is executed first, then the cancel signal is propagated upstream to the source.
onCancel
- the callback to call on Subscription.cancel()
Mono
public final <R> Mono<T> doOnDiscard(java.lang.Class<R> type, java.util.function.Consumer<? super R> discardHook)
Potentially modify the behavior of the whole chain of operators upstream of this one to conditionally clean up elements that get discarded by these operators.
The discardHook
MUST be idempotent and safe to use on any instance of the desired
type.
Calls to this method are additive, and the order of invocation of the discardHook
is the same as the order of declaration (calling .filter(...).doOnDiscard(first).doOnDiscard(second)
will let the filter invoke first
then second
handlers).
Two main categories of discarding operators exist:
type
- the Class
of elements in the upstream chain of operators that
this cleanup hook should take into account.discardHook
- a Consumer
of elements in the upstream chain of operators
that performs the cleanup.Mono
that cleans up matching elements that get discarded upstream of it.public final Mono<T> doOnNext(java.util.function.Consumer<? super T> onNext)
Consumer
首先执行,然后将 onNext 信号传播到下游
Add behavior triggered when the Mono
emits a data successfully.
The Consumer
is executed first, then the onNext signal is propagated
downstream.
onNext
- the callback to call on Subscriber.onNext(T)
Mono
public final Mono<T> doOnSuccess(java.util.function.Consumer<? super T> onSuccess)
null :没有数据就完成了。处理程序在 onComplete 传播到下游之前立即执行
T:完成数据。处理程序在 onNext 传播到下游之前立即执行
Add behavior triggered as soon as the Mono
can be considered to have completed successfully.
The value passed to the Consumer
reflects the type of completion:
The Consumer
is executed before propagating either onNext or onComplete downstream.
public final Mono<T> doOnEach(java.util.function.Consumer<? super Signal<T>> signalConsumer)
Add behavior triggered when the Mono
emits an item, fails with an error
or completes successfully. All these events are represented as a Signal
that is passed to the side-effect callback. Note that this is an advanced operator,
typically used for monitoring of a Mono.
These Signal
have a Context
associated to them.
The Consumer
is executed first, then the relevant signal is propagated
downstream.
signalConsumer
- the mandatory callback to call on
Subscriber.onNext(Object)
, Subscriber.onError(Throwable)
and
Subscriber.onComplete()
Mono
doOnNext(Consumer)
,
doOnError(Consumer)
,
materialize()
,
Signal
public final Mono<T> doOnError(java.util.function.Consumer<? super java.lang.Throwable> onError)
Add behavior triggered when the Mono
completes with an error.
The Consumer
is executed first, then the onError signal is propagated
downstream.
onError
- the error callback to call on Subscriber.onError(Throwable)
Mono
public final <E extends java.lang.Throwable> Mono<T> doOnError(java.lang.Class<E> exceptionType, java.util.function.Consumer<? super E> onError)
Add behavior triggered when the Mono
completes with an error matching the given exception type.
The Consumer
is executed first, then the onError signal is propagated
downstream.
E
- type of the error to handleexceptionType
- the type of exceptions to handleonError
- the error handler for relevant errorsMono
public final Mono<T> doOnError(java.util.function.Predicate<? super java.lang.Throwable> predicate, java.util.function.Consumer<? super java.lang.Throwable> onError)
Predicate
(泛型 Throwable) 匹配的错误时触发的行为。Consumer首先被执行,然后 onError 信号被传播到下游
Add behavior triggered when the Mono
completes with an error matching the given predicate.
The Consumer
is executed first, then the onError signal is propagated
downstream.
predicate
- the matcher for exceptions to handleonError
- the error handler for relevant errorMono
public final Mono<T> doOnRequest(java.util.function.LongConsumer consumer)
Add behavior triggering a LongConsumer
when the Mono
receives any request.
Note that non fatal error raised in the callback will not be propagated and
will simply trigger Operators.onOperatorError(Throwable, Context)
.
The LongConsumer
is executed first, then the request signal is propagated
upstream to the parent.
consumer
- the consumer to invoke on each requestMono
public final Mono<T> doOnSubscribe(java.util.function.Consumer<? super org.reactivestreams.Subscription> onSubscribe)
Subscription
订阅者给 Consumer
处理后,再将信号传播到下游
Add behavior (side-effect) triggered when the Mono
is being subscribed,
that is to say when a Subscription
has been produced by the Publisher
and is being passed to the Subscriber.onSubscribe(Subscription)
.
This method is not intended for capturing the subscription and calling its methods,
but for side effects like monitoring. For instance, the correct way to cancel a subscription is
to call Disposable.dispose()
on the Disposable returned by subscribe()
.
The Consumer
is executed first, then the Subscription
is propagated
downstream to the next subscriber in the chain that is being established.
onSubscribe
- the callback to call on Subscriber.onSubscribe(Subscription)
Mono
doFirst(Runnable)
public final Mono<T> doOnTerminate(java.lang.Runnable onTerminate)
Add behavior triggered when the Mono
terminates, either by completing with a value,
completing empty or failing with an error. Unlike in Flux.doOnTerminate(Runnable)
,
the simple fact that a Mono
emits onNext
implies
completion, so the handler is invoked BEFORE the element is propagated (same as with doOnSuccess(Consumer)
).
The Runnable
is executed first, then the onNext/onComplete/onError signal is propagated
downstream.
onTerminate
- the callback to call Subscriber.onNext(T)
, Subscriber.onComplete()
without preceding Subscriber.onNext(T)
or Subscriber.onError(java.lang.Throwable)
Mono
public final Mono<Tuple2<java.lang.Long,T>> elapsed()
Tuple2
, 第一个值是生成数据的耗时
Map this Mono
into Tuple2<Long, T>
of timemillis and source data. The timemillis corresponds to the elapsed time between
the subscribe and the first next signal, as measured by the parallel
scheduler.
public final Mono<Tuple2<java.lang.Long,T>> elapsed(Scheduler scheduler)
Map this Mono
sequence into Tuple2<Long, T>
of timemillis and source data. The timemillis corresponds to the elapsed time between the subscribe and the first
next signal, as measured by the provided Scheduler
.
scheduler
- a Scheduler
instance to read time fromMono
that emits a tuple of time elapsed in milliseconds and matching datatimed(Scheduler)
public final Flux<T> expandDeep(java.util.function.Function<? super T,? extends org.reactivestreams.Publisher<? extends T>> expander, int capacityHint)
That is: emit the value from this Mono
, expand it and emit the first value
at this first level of recursion, and so on... When no more recursion is possible,
backtrack to the previous level and re-apply the strategy.
For example, given the hierarchical structure
A - AA - aa1 - AB - ab1 - a1Expands
Mono.just(A)
into
A AA aa1 AB ab1 a1
expander
- the Function
applied at each level of recursion to expand
values into a Publisher
, producing a graph.capacityHint
- a capacity hint to prepare the inner queues to accommodate n
elements per level of recursion.Flux
public final Flux<T> expandDeep(java.util.function.Function<? super T,? extends org.reactivestreams.Publisher<? extends T>> expander)
That is: emit the value from this Mono
, expand it and emit the first value
at this first level of recursion, and so on... When no more recursion is possible,
backtrack to the previous level and re-apply the strategy.
For example, given the hierarchical structure
A - AA - aa1 - AB - ab1 - a1Expands
Mono.just(A)
into
A AA aa1 AB ab1 a1
expander
- the Function
applied at each level of recursion to expand
values into a Publisher
, producing a graph.Flux
public final Flux<T> expand(java.util.function.Function<? super T,? extends org.reactivestreams.Publisher<? extends T>> expander, int capacityHint)
That is: emit the value from this Mono
first, then expand it at a first level of
recursion and emit all of the resulting values, then expand all of these at a
second level and so on...
For example, given the hierarchical structure
A - AA - aa1 - AB - ab1 - a1Expands
Mono.just(A)
into
A AA AB a1 aa1 ab1
expander
- the Function
applied at each level of recursion to expand
values into a Publisher
, producing a graph.capacityHint
- a capacity hint to prepare the inner queues to accommodate n
elements per level of recursion.Flux
public final Flux<T> expand(java.util.function.Function<? super T,? extends org.reactivestreams.Publisher<? extends T>> expander)
That is: emit the value from this Mono
first, then expand it at a first level of
recursion and emit all of the resulting values, then expand all of these at a
second level and so on...
For example, given the hierarchical structure
A - AA - aa1 - AB - ab1 - a1Expands
Mono.just(A)
into
A AA AB a1 aa1 ab1
expander
- the Function
applied at each level of recursion to expand
values into a Publisher
, producing a graph.Flux
public final Mono<T> filter(java.util.function.Predicate<? super T> tester)
If this Mono
is valued, test the result and replay it if predicate returns true.
Otherwise complete without value.
Discard Support: This operator discards the element if it does not match the filter. It also discards upon cancellation or error triggered by a data signal.
tester
- the predicate to evaluateMono
public final Mono<T> filterWhen(java.util.function.Function<? super T,? extends org.reactivestreams.Publisher<java.lang.Boolean>> asyncPredicate)
Publisher
的布尔类型
If this Mono
is valued, test the value asynchronously using a generated
Publisher<Boolean>
test. The value from the Mono is replayed if the
first item emitted by the test is true. It is dropped if the test is
either empty or its first emitted value is false.
Note that only the first value of the test publisher is considered, and unless it
is a Mono
, test will be cancelled after receiving that first value.
Discard Support: This operator discards the element if it does not match the filter. It also discards upon cancellation or error triggered by a data signal.
asyncPredicate
- the function generating a Publisher
of Boolean
to filter the Mono withMono
public final <R> Mono<R> flatMap(java.util.function.Function<? super T,? extends Mono<? extends R>> transformer)
public final <R> Flux<R> flatMapMany(java.util.function.Function<? super T,? extends org.reactivestreams.Publisher<? extends R>> mapper)
Transform the item emitted by this Mono
into a Publisher, then forward
its emissions into the returned Flux
.
R
- the merged sequence typemapper
- the
Function
to produce a sequence of R from the eventual passed Subscriber.onNext(T)
Flux
as the sequence is not guaranteed to be single at mostpublic final <R> Flux<R> flatMapMany(java.util.function.Function<? super T,? extends org.reactivestreams.Publisher<? extends R>> mapperOnNext, java.util.function.Function<? super java.lang.Throwable,? extends org.reactivestreams.Publisher<? extends R>> mapperOnError, java.util.function.Supplier<? extends org.reactivestreams.Publisher<? extends R>> mapperOnComplete)
Transform the signals emitted by this Mono
into signal-specific Publishers,
then forward the applicable Publisher's emissions into the returned Flux
.
R
- the type of the produced inner sequencemapperOnNext
- the Function
to call on next data and returning a sequence to mergemapperOnError
- the Function
to call on error signal and returning a sequence to mergemapperOnComplete
- the Function
to call on complete signal and returning a sequence to mergeFlux
as the sequence is not guaranteed to be single at mostFlux.flatMap(Function, Function, Supplier)
public final <R> Flux<R> flatMapIterable(java.util.function.Function<? super T,? extends java.lang.Iterable<? extends R>> mapper)
Transform the item emitted by this Mono
into Iterable
, then forward
its elements into the returned Flux
.
The Iterable.iterator()
method will be called at least once and at most twice.
This operator inspects each Iterable
's Spliterator
to assess if the iteration
can be guaranteed to be finite (see Operators.onDiscardMultiple(Iterator, boolean, Context)
).
Since the default Spliterator wraps the Iterator we can have two Iterable.iterator()
calls per iterable. This second invocation is skipped on a Collection
however, a type which is
assumed to be always finite.
Discard Support: Upon cancellation, this operator discards T
elements it prefetched and, in
some cases, attempts to discard remainder of the currently processed Iterable
(if it can
safely ensure the iterator is finite). Note that this means each Iterable
's Iterable.iterator()
method could be invoked twice.
R
- the merged output sequence typemapper
- the Function
to transform input item into a sequence Iterable
Flux
public final Mono<java.lang.Boolean> hasElement()
Emit a single boolean true if this Mono
has an element.
Mono
with true
if a value is emitted and false
otherwisepublic final <R> Mono<R> handle(java.util.function.BiConsumer<? super T,SynchronousSink<R>> handler)
BiConsumer
来处理此Mono发出的项目,相比与 map(Function)
,
它可以往Sink里传递 SynchronousSink.error(Throwable)
或 SynchronousSink.complete()
Handle the items emitted by this Mono
by calling a biconsumer with the
output sink for each onNext. At most one SynchronousSink.next(Object)
call must be performed and/or 0 or 1 SynchronousSink.error(Throwable)
or
SynchronousSink.complete()
.
When the context-propagation library
is available at runtime and the downstream ContextView
is not empty, this operator implicitly uses the
library to restore thread locals around the handler BiConsumer
. Typically, this would be done in conjunction
with the use of contextCapture()
operator down the chain.
R
- the transformed typehandler
- the handling BiConsumer
Mono
public final Mono<T> hide()
Hides the identity of this Mono
instance.
The main purpose of this operator is to prevent certain identity-based optimizations from happening, mostly for diagnostic purposes.
Mono
preventing Publisher
/ Subscription
based Reactor optimizationspublic final Mono<T> ignoreElement()
Ignores onNext signal (dropping it) and only propagates termination events.
Discard Support: This operator discards the source element.
public final Mono<T> log()
Observe all Reactive Streams signals and trace them using Logger
support.
Default will use Level.INFO
and java.util.logging
.
If SLF4J is available, it will be used instead.
The default log category will be "reactor.Mono", followed by a suffix generated from the source operator, e.g. "reactor.Mono.Map".
Mono
that logs signalsFlux.log()
public final Mono<T> log(@Nullable java.lang.String category)
Logger
support to handle trace implementation. Default will
use Level.INFO
and java.util.logging. If SLF4J is available, it will be used instead.
category
- to be mapped into logger configuration (e.g. org.springframework
.reactor). If category ends with "." like "reactor.", a generated operator
suffix will complete, e.g. "reactor.Flux.Map".Mono
public final Mono<T> log(@Nullable java.lang.String category, java.util.logging.Level level, SignalType... options)
options
and use
Logger
support to handle trace implementation. Default will use the passed
Level
and java.util.logging. If SLF4J is available, it will be used instead.
Options allow fine grained filtering of the traced signal, for instance to only capture onNext and onError:
mono.log("category", SignalType.ON_NEXT, SignalType.ON_ERROR)
category
- to be mapped into logger configuration (e.g. org.springframework
.reactor). If category ends with "." like "reactor.", a generated operator
suffix will complete, e.g. "reactor.Flux.Map".level
- the Level
to enforce for this tracing Mono (only FINEST, FINE,
INFO, WARNING and SEVERE are taken into account)options
- a vararg SignalType
option to filter log messagesMono
public final Mono<T> log(@Nullable java.lang.String category, java.util.logging.Level level, boolean showOperatorLine, SignalType... options)
options
and
use Logger
support to
handle trace
implementation. Default will
use the passed Level
and java.util.logging. If SLF4J is available, it will be used instead.
Options allow fine grained filtering of the traced signal, for instance to only capture onNext and onError:
mono.log("category", Level.INFO, SignalType.ON_NEXT, SignalType.ON_ERROR)
category
- to be mapped into logger configuration (e.g. org.springframework
.reactor). If category ends with "." like "reactor.", a generated operator
suffix will complete, e.g. "reactor.Mono.Map".level
- the Level
to enforce for this tracing Mono (only FINEST, FINE,
INFO, WARNING and SEVERE are taken into account)showOperatorLine
- capture the current stack to display operator
class/line number.options
- a vararg SignalType
option to filter log messagesMono
public final Mono<T> log(Logger logger)
options
and
trace them using a specific user-provided Logger
, at Level.INFO
level.
public final Mono<T> log(Logger logger, java.util.logging.Level level, boolean showOperatorLine, SignalType... options)
options
and
trace them using a specific user-provided Logger
, at the given Level
.
Options allow fine grained filtering of the traced signal, for instance to only capture onNext and onError:
flux.log(myCustomLogger, Level.INFO, SignalType.ON_NEXT, SignalType.ON_ERROR)
logger
- the Logger
to use, instead of resolving one through a category.level
- the Level
to enforce for this tracing Flux (only FINEST, FINE,
INFO, WARNING and SEVERE are taken into account)showOperatorLine
- capture the current stack to display operator class/line number.options
- a vararg SignalType
option to filter log messagesMono
that logs signalspublic final <R> Mono<R> map(java.util.function.Function<? super T,? extends R> mapper)
Transform the item emitted by this Mono
by applying a synchronous function to it.
R
- the transformed typemapper
- the synchronous transforming Function
Mono
public final <R> Mono<R> mapNotNull(java.util.function.Function<? super T,? extends R> mapper)
Transform the item emitted by this Mono
by applying a synchronous function to it, which is allowed
to produce a null
value. In that case, the resulting Mono completes immediately.
This operator effectively behaves like map(Function)
followed by filter(Predicate)
although null
is not a supported value, so it can't be filtered out.
R
- the transformed typemapper
- the synchronous transforming Function
Mono
public final Mono<Signal<T>> materialize()
Signal
对象
Transform incoming onNext, onError and onComplete signals into Signal
instances,
materializing these signals.
Since the error is materialized as a Signal
, the propagation will be stopped and onComplete will be
emitted. Complete signal will first emit a Signal.complete()
and then effectively complete the flux.
All these Signal
have a Context
associated to them.
Mono
of materialized Signal
dematerialize()
public final Flux<T> mergeWith(org.reactivestreams.Publisher<? extends T> other)
Mono
with the provided Publisher
.
The element from the Mono may be interleaved with the elements of the Publisher.
other
- the Publisher
to merge withFlux
as the sequence is not guaranteed to be at most 1@Deprecated public final Mono<T> metrics()
tap(SignalListenerFactory)
with the SignalListenerFactory
provided by
the new reactor-core-micrometer module. To be removed in 3.6.0 at the earliest.
Metrics are gathered on Subscriber
events, and it is recommended to also
name
(and optionally tag
) the
sequence.
The name serves as a prefix in the reported metrics names. In case no name has been provided, the default name "reactor" will be applied.
The MeterRegistry
used by reactor can be configured via
Metrics.MicrometerConfiguration.useRegistry(MeterRegistry)
prior to using this operator, the default being
Metrics.globalRegistry
.
Mono
name(String)
,
tag(String, String)
public final Mono<T> name(java.lang.String name)
Give a name to this sequence, which can be retrieved using Scannable.name()
as long as this is the first reachable Scannable.parents()
.
The name is typically visible at assembly time by the tap(SignalListenerFactory)
operator,
which could for example be configured with a metrics listener using the name as a prefix for meters' id.
name
- a name for the sequencemetrics()
,
tag(String, String)
public final Mono<T> or(Mono<? extends T> other)
Emit the first available signal from this mono or the other mono.
other
- the racing other Mono
to compete with for the signalMono
firstWithSignal(reactor.core.publisher.Mono<? extends T>...)
public final <U> Mono<U> ofType(java.lang.Class<U> clazz)
Evaluate the emitted value against the given Class
type. If the
value matches the type, it is passed into the new Mono
. Otherwise the
value is ignored.
clazz
- the Class
type to test values againstMono
filtered on the requested typepublic final Mono<T> onErrorComplete()
Simply complete the sequence by replacing an onError signal
with an onComplete signal
. All other signals are propagated as-is.
Mono
falling back on completion when an onError occursonErrorReturn(Object)
public final Mono<T> onErrorComplete(java.lang.Class<? extends java.lang.Throwable> type)
Simply complete the sequence by replacing an onError signal
with an onComplete signal
if the error matches the given
Class
. All other signals, including non-matching onError, are propagated as-is.
Mono
falling back on completion when a matching error occursonErrorReturn(Class, Object)
public final Mono<T> onErrorComplete(java.util.function.Predicate<? super java.lang.Throwable> predicate)
Predicate
返回的布尔值来决定是否吞掉异常让源完成
Simply complete the sequence by replacing an onError signal
with an onComplete signal
if the error matches the given
Predicate
. All other signals, including non-matching onError, are propagated as-is.
Mono
falling back on completion when a matching error occursonErrorReturn(Predicate, Object)
public final Mono<T> onErrorContinue(java.util.function.BiConsumer<java.lang.Throwable,java.lang.Object> errorConsumer)
BiConsumer
.
Alternatively, throwing from that biconsumer will propagate the thrown exception downstream
in place of the original error, which is added as a suppressed exception to the new one.
This operator is offered on Mono
mainly as a way to propagate the configuration to
upstream Flux
. The mode doesn't really make sense on a Mono
, since we're sure
there will be no further value to continue with.
onErrorResume(Function)
is a more classical fit.
Note that onErrorContinue() is a specialist operator that can make the behaviour of your reactive chain unclear. It operates on upstream, not downstream operators, it requires specific operator support to work, and the scope can easily propagate upstream into library code that didn't anticipate it (resulting in unintended behaviour.)
In most cases, you should instead handle the error inside the specific function which may cause
it. Specifically, on each inner publisher you can use doOnError
to log the error, and
onErrorResume(e -> Mono.empty())
to drop erroneous elements:
.flatMap(id -> repository.retrieveById(id) .doOnError(System.err::println) .onErrorResume(e -> Mono.empty()))
This has the advantage of being much clearer, has no ambiguity with regards to operator support, and cannot leak upstream.
errorConsumer
- a BiConsumer
fed with errors matching the Class
and the value that triggered the error.Mono
that attempts to continue processing on errors.public final <E extends java.lang.Throwable> Mono<T> onErrorContinue(java.lang.Class<E> type, java.util.function.BiConsumer<java.lang.Throwable,java.lang.Object> errorConsumer)
type
are recovered from.
The recovered error and associated value are notified via the provided BiConsumer
.
Alternatively, throwing from that biconsumer will propagate the thrown exception downstream
in place of the original error, which is added as a suppressed exception to the new one.
This operator is offered on Mono
mainly as a way to propagate the configuration to
upstream Flux
. The mode doesn't really make sense on a Mono
, since we're sure
there will be no further value to continue with.
onErrorResume(Function)
is a more classical fit.
Note that onErrorContinue() is a specialist operator that can make the behaviour of your reactive chain unclear. It operates on upstream, not downstream operators, it requires specific operator support to work, and the scope can easily propagate upstream into library code that didn't anticipate it (resulting in unintended behaviour.)
In most cases, you should instead handle the error inside the specific function which may cause
it. Specifically, on each inner publisher you can use doOnError
to log the error, and
onErrorResume(e -> Mono.empty())
to drop erroneous elements:
.flatMap(id -> repository.retrieveById(id) .doOnError(MyException.class, System.err::println) .onErrorResume(MyException.class, e -> Mono.empty()))
This has the advantage of being much clearer, has no ambiguity with regards to operator support, and cannot leak upstream.
type
- the Class
of Exception
that are resumed from.errorConsumer
- a BiConsumer
fed with errors matching the Class
and the value that triggered the error.Mono
that attempts to continue processing on some errors.public final <E extends java.lang.Throwable> Mono<T> onErrorContinue(java.util.function.Predicate<E> errorPredicate, java.util.function.BiConsumer<java.lang.Throwable,java.lang.Object> errorConsumer)
Predicate
are recovered from (note that this
predicate can be applied several times and thus must be idempotent).
The recovered error and associated value are notified via the provided BiConsumer
.
Alternatively, throwing from that biconsumer will propagate the thrown exception downstream
in place of the original error, which is added as a suppressed exception to the new one.
This operator is offered on Mono
mainly as a way to propagate the configuration to
upstream Flux
. The mode doesn't really make sense on a Mono
, since we're sure
there will be no further value to continue with.
onErrorResume(Function)
is a more classical fit.
Note that onErrorContinue() is a specialist operator that can make the behaviour of your reactive chain unclear. It operates on upstream, not downstream operators, it requires specific operator support to work, and the scope can easily propagate upstream into library code that didn't anticipate it (resulting in unintended behaviour.)
In most cases, you should instead handle the error inside the specific function which may cause
it. Specifically, on each inner publisher you can use doOnError
to log the error, and
onErrorResume(e -> Mono.empty())
to drop erroneous elements:
.flatMap(id -> repository.retrieveById(id) .doOnError(errorPredicate, System.err::println) .onErrorResume(errorPredicate, e -> Mono.empty()))
This has the advantage of being much clearer, has no ambiguity with regards to operator support, and cannot leak upstream.
errorPredicate
- a Predicate
used to filter which errors should be resumed from.
This MUST be idempotent, as it can be used several times.errorConsumer
- a BiConsumer
fed with errors matching the predicate and the value
that triggered the error.Mono
that attempts to continue processing on some errors.public final Mono<T> onErrorStop()
onErrorContinue(BiConsumer)
variant has been used downstream, reverts
to the default 'STOP' mode where errors are terminal events upstream. It can be
used for easier scoping of the on next failure strategy or to override the
inherited strategy in a sub-stream (for example in a flatMap). It has no effect if
onErrorContinue(BiConsumer)
has not been used downstream.Mono
that terminates on errors, even if onErrorContinue(BiConsumer)
was used downstreampublic final Mono<T> onErrorMap(java.util.function.Predicate<? super java.lang.Throwable> predicate, java.util.function.Function<? super java.lang.Throwable,? extends java.lang.Throwable> mapper)
Predicate
匹配,则通过同步应用函数来转换此Mono发出的错误。否则让错误继续传递
Transform an error emitted by this Mono
by synchronously applying a function
to it if the error matches the given predicate. Otherwise let the error pass through.
predicate
- the error predicatemapper
- the error transforming Function
Mono
that transforms some source errors to other errorspublic final Mono<T> onErrorMap(java.util.function.Function<? super java.lang.Throwable,? extends java.lang.Throwable> mapper)
Transform any error emitted by this Mono
by synchronously applying a function to it.
mapper
- the error transforming Function
Mono
that transforms source errors to other errorspublic final <E extends java.lang.Throwable> Mono<T> onErrorMap(java.lang.Class<E> type, java.util.function.Function<? super E,? extends java.lang.Throwable> mapper)
Transform an error emitted by this Mono
by synchronously applying a function
to it if the error matches the given type. Otherwise let the error pass through.
E
- the error typetype
- the class of the exception type to react tomapper
- the error transforming Function
Mono
that transforms some source errors to other errorspublic final Mono<T> onErrorResume(java.util.function.Function<? super java.lang.Throwable,? extends Mono<? extends T>> fallback)
Subscribe to a fallback publisher when any error occurs, using a function to choose the fallback depending on the error.
fallback
- the function to choose the fallback to an alternative Mono
Mono
falling back upon source onErrorFlux.onErrorResume(java.util.function.Function<? super java.lang.Throwable, ? extends org.reactivestreams.Publisher<? extends T>>)
public final <E extends java.lang.Throwable> Mono<T> onErrorResume(java.lang.Class<E> type, java.util.function.Function<? super E,? extends Mono<? extends T>> fallback)
Subscribe to a fallback publisher when an error matching the given type occurs, using a function to choose the fallback depending on the error.
E
- the error typetype
- the error type to matchfallback
- the function to choose the fallback to an alternative Mono
Mono
falling back upon source onErrorFlux.onErrorResume(java.util.function.Function<? super java.lang.Throwable, ? extends org.reactivestreams.Publisher<? extends T>>)
public final Mono<T> onErrorResume(java.util.function.Predicate<? super java.lang.Throwable> predicate, java.util.function.Function<? super java.lang.Throwable,? extends Mono<? extends T>> fallback)
Predicate
匹配时,进行错误恢复(动态候补值),当上游发生异常进行回调
Subscribe to a fallback publisher when an error matching a given predicate occurs.
predicate
- the error predicate to matchfallback
- the function to choose the fallback to an alternative Mono
Mono
falling back upon source onErrorFlux.onErrorResume(java.util.function.Function<? super java.lang.Throwable, ? extends org.reactivestreams.Publisher<? extends T>>)
public final Mono<T> onErrorReturn(T fallbackValue)
Simply emit a captured fallback value when any error is observed on this Mono
.
fallbackValue
- the value to emit if an error occursMono
onErrorComplete()
public final <E extends java.lang.Throwable> Mono<T> onErrorReturn(java.lang.Class<E> type, T fallbackValue)
Simply emit a captured fallback value when an error of the specified type is
observed on this Mono
.
E
- the error typetype
- the error type to matchfallbackValue
- the value to emit if an error occurs that matches the typeMono
onErrorComplete(Class)
public final Mono<T> onErrorReturn(java.util.function.Predicate<? super java.lang.Throwable> predicate, T fallbackValue)
Predicate
匹配,则返回一个默认的值
Simply emit a captured fallback value when an error matching the given predicate is
observed on this Mono
.
predicate
- the error predicate to matchfallbackValue
- the value to emit if an error occurs that matches the predicateMono
onErrorComplete(Predicate)
public final Mono<T> onTerminateDetach()
Subscriber
(事件处理者,是真正去响应事件和处理数据的角色) 和
Subscription
(订阅者,是实际从Publisher中获取数据的角色)
Detaches both the child Subscriber
and the Subscription
on
termination or cancellation.
This should help with odd retention scenarios when running
with non-reactor Subscriber
.
Mono
public final <R> Mono<R> publish(java.util.function.Function<? super Mono<T>,? extends Mono<? extends R>> transform)
Share a Mono
for the duration of a function that may transform it and
consume it as many times as necessary without causing multiple subscriptions
to the upstream.
R
- the output value typetransform
- the transformation functionMono
public final Mono<T> publishOn(Scheduler scheduler)
Run onNext, onComplete and onError on a supplied Scheduler
Worker
.
This operator influences the threading context where the rest of the operators in
the chain below it will execute, up to a new occurrence of publishOn
.
Typically used for fast publisher, slow consumer(s) scenarios.
mono.publishOn(Schedulers.single()).subscribe()
scheduler
- a Scheduler
providing the Scheduler.Worker
where to publishMono
public final Flux<T> repeat()
Repeatedly and indefinitely subscribe to the source upon completion of the previous subscription.
Flux
on onCompletepublic final Flux<T> repeat(java.util.function.BooleanSupplier predicate)
BooleanSupplier
在上一次订阅完成后返回 true,则重复订阅源
Repeatedly subscribe to the source if the predicate returns true after completion of the previous subscription.
predicate
- the boolean to evaluate on onComplete.Flux
that repeats on onComplete while the predicate matchespublic final Flux<T> repeat(long numRepeat)
Repeatedly subscribe to the source numRepeat times. This results in
numRepeat + 1
total subscriptions to the original source. As a consequence,
using 0 plays the original sequence once.
numRepeat
- the number of times to re-subscribe on onComplete (positive, or 0 for original sequence only)Flux
that repeats on onComplete, up to the specified number of repetitionspublic final Flux<T> repeat(long numRepeat, java.util.function.BooleanSupplier predicate)
Repeatedly subscribe to the source if the predicate returns true after completion of the previous subscription. A specified maximum of repeat will limit the number of re-subscribe.
numRepeat
- the number of times to re-subscribe on complete (positive, or 0 for original sequence only)predicate
- the boolean to evaluate on onCompleteFlux
that repeats on onComplete while the predicate matches,
up to the specified number of repetitionspublic final Flux<T> repeatWhen(java.util.function.Function<Flux<java.lang.Long>,? extends org.reactivestreams.Publisher<?>> repeatFactory)
Mono
when a companion sequence emits elements in
response to the flux completion signal. Any terminal signal from the companion
sequence will terminate the resulting Flux
with the same signal immediately.
If the companion sequence signals when this Mono
is active, the repeat
attempt is suppressed.
Note that if the companion Publisher
created by the repeatFactory
emits Context
as trigger objects, the content of these Context will be added
to the operator's own Context
.
repeatFactory
- the Function
that returns the associated Publisher
companion, given a Flux
that signals each onComplete as a Long
representing the number of source elements emitted in the latest attempt (0 or 1).Flux
that repeats on onComplete when the companion Publisher
produces an
onNext signalpublic final Mono<T> repeatWhenEmpty(java.util.function.Function<Flux<java.lang.Long>,? extends org.reactivestreams.Publisher<?>> repeatFactory)
public final Mono<T> repeatWhenEmpty(int maxRepeat, java.util.function.Function<Flux<java.lang.Long>,? extends org.reactivestreams.Publisher<?>> repeatFactory)
Mono
as long as the current subscription to this
Mono
completes empty and the companion Publisher
produces an onNext signal.
Any terminal signal will terminate the resulting Mono
with the same signal immediately.
Emits an IllegalStateException
if maxRepeat
is exceeded (provided
it is different from Integer.MAX_VALUE
).
maxRepeat
- the maximum number of repeats (infinite if Integer.MAX_VALUE
)repeatFactory
- the Function
that returns the associated Publisher
companion, given a Flux
that signals each onComplete as a 0-based incrementing Long
.Mono
that resubscribes to this Mono
if the previous subscription was empty,
as long as the companion Publisher
produces an onNext signal and the maximum number of repeats isn't exceeded.public final Mono<T> retry()
Re-subscribes to this Mono
sequence if it signals any error, indefinitely.
Mono
that retries on onErrorpublic final Mono<T> retry(long numRetries)
Re-subscribes to this Mono
sequence if it signals any error, for a fixed
number of times.
Note that passing Long.MAX_VALUE is treated as infinite retry.
numRetries
- the number of times to tolerate an errorMono
that retries on onError up to the specified number of retry attempts.public final Mono<T> retryWhen(Retry retrySpec)
RetrySpec
自定义重试策略
Retries this Mono
in response to signals emitted by a companion Publisher
.
The companion is generated by the provided Retry
instance, see Retry.max(long)
, Retry.maxInARow(long)
and Retry.backoff(long, Duration)
for readily available strategy builders.
The operator generates a base for the companion, a Flux
of Retry.RetrySignal
which each give metadata about each retryable failure whenever this Mono
signals an error. The final companion
should be derived from that base companion and emit data in response to incoming onNext (although it can emit less
elements, or delay the emissions).
Terminal signals in the companion terminate the sequence with the same signal, so emitting an Subscriber.onError(Throwable)
will fail the resulting Mono
with that same error.
Note that the Retry.RetrySignal
state can be transient and change between each source
onError
or
onNext
. If processed with a delay,
this could lead to the represented state being out of sync with the state at which the retry
was evaluated. Map it to Retry.RetrySignal.copy()
right away to mediate this.
Note that if the companion Publisher
created by the whenFactory
emits Context
as trigger objects, these Context
will be merged with
the previous Context:
Retry customStrategy = Retry.from(companion -> companion.handle((retrySignal, sink) -> { Context ctx = sink.currentContext(); int rl = ctx.getOrDefault("retriesLeft", 0); if (rl > 0) { sink.next(Context.of( "retriesLeft", rl - 1, "lastError", retrySignal.failure() )); } else { sink.error(Exceptions.retryExhausted("retries exhausted", retrySignal.failure())); } })); Mono<T> retried = originalMono.retryWhen(customStrategy);
retrySpec
- the Retry
strategy that will generate the companion Publisher
,
given a Flux
that signals each onError as a Retry.RetrySignal
.Mono
that retries on onError when a companion Publisher
produces an onNext signalRetry.max(long)
,
Retry.maxInARow(long)
,
Retry.backoff(long, Duration)
public final Mono<T> share()
Prepare a Mono
which shares this Mono
result similar to Flux.shareNext()
.
This will effectively turn this Mono
into a hot task when the first
Subscriber
subscribes using subscribe()
API. Further Subscriber
will share the same Subscription
and therefore the same result.
It's worth noting this is an un-cancellable Subscription
.
Mono
public final Mono<T> single()
Expect exactly one item from this Mono
source or signal
NoSuchElementException
for an empty source.
Note Mono doesn't need Flux.single(Object)
, since it is equivalent to
defaultIfEmpty(Object)
in a Mono
.
Mono
with the single item or an error signalpublic final Disposable subscribe()
Mono
and request unbounded demand.
This version doesn't specify any consumption behavior for the events from the chain, especially no error handling, so other variants should usually be preferred.
Disposable
that can be used to cancel the underlying Subscription
public final Disposable subscribe(java.util.function.Consumer<? super T> consumer)
Consumer
to this Mono
that will consume all the
sequence. It will request an unbounded demand (Long.MAX_VALUE
).
For a passive version that observe and forward incoming data see doOnNext(java.util.function.Consumer)
.
Keep in mind that since the sequence can be asynchronous, this will immediately return control to the calling thread. This can give the impression the consumer is not invoked when executing in a main thread or a unit test for instance.
consumer
- the consumer to invoke on each value (onNext signal)Disposable
that can be used to cancel the underlying Subscription
public final Disposable subscribe(@Nullable java.util.function.Consumer<? super T> consumer, java.util.function.Consumer<? super java.lang.Throwable> errorConsumer)
Mono
with a Consumer
that will consume all the
elements in the sequence, as well as a Consumer
that will handle errors.
The subscription will request an unbounded demand (Long.MAX_VALUE
).
For a passive version that observe and forward incoming data see doOnSuccess(Consumer)
and
doOnError(java.util.function.Consumer)
.
Keep in mind that since the sequence can be asynchronous, this will immediately return control to the calling thread. This can give the impression the consumer is not invoked when executing in a main thread or a unit test for instance.
consumer
- the consumer to invoke on each next signalerrorConsumer
- the consumer to invoke on error signalDisposable
that can be used to cancel the underlying Subscription
public final Disposable subscribe(@Nullable java.util.function.Consumer<? super T> consumer, @Nullable java.util.function.Consumer<? super java.lang.Throwable> errorConsumer, @Nullable java.lang.Runnable completeConsumer)
Consumer
to this Mono
that will respectively consume all the
elements in the sequence, handle errors and react to completion. The subscription
will request unbounded demand (Long.MAX_VALUE
).
For a passive version that observe and forward incoming data see doOnSuccess(Consumer)
and
doOnError(java.util.function.Consumer)
.
Keep in mind that since the sequence can be asynchronous, this will immediately return control to the calling thread. This can give the impression the consumer is not invoked when executing in a main thread or a unit test for instance.
consumer
- the consumer to invoke on each valueerrorConsumer
- the consumer to invoke on error signalcompleteConsumer
- the consumer to invoke on complete signalDisposable
that can be used to cancel the underlying Subscription
public final Disposable subscribe(@Nullable java.util.function.Consumer<? super T> consumer, @Nullable java.util.function.Consumer<? super java.lang.Throwable> errorConsumer, @Nullable java.lang.Runnable completeConsumer, @Nullable java.util.function.Consumer<? super org.reactivestreams.Subscription> subscriptionConsumer)
Consumer
to this Mono
that will respectively consume all the
elements in the sequence, handle errors, react to completion, and request upon subscription.
It will let the provided subscriptionConsumer
request the adequate amount of data, or request unbounded demand
Long.MAX_VALUE
if no such consumer is provided.
For a passive version that observe and forward incoming data see doOnSuccess(Consumer)
and
doOnError(java.util.function.Consumer)
.
Keep in mind that since the sequence can be asynchronous, this will immediately return control to the calling thread. This can give the impression the consumer is not invoked when executing in a main thread or a unit test for instance.
consumer
- the consumer to invoke on each valueerrorConsumer
- the consumer to invoke on error signalcompleteConsumer
- the consumer to invoke on complete signalsubscriptionConsumer
- the consumer to invoke on subscribe signal, to be used
for the initial request
, or null for max requestDisposable
that can be used to cancel the underlying Subscription
public final Disposable subscribe(@Nullable java.util.function.Consumer<? super T> consumer, @Nullable java.util.function.Consumer<? super java.lang.Throwable> errorConsumer, @Nullable java.lang.Runnable completeConsumer, @Nullable Context initialContext)
Consumer
to this Mono
that will respectively consume all the
elements in the sequence, handle errors and react to completion. Additionally, a Context
is tied to the subscription. At subscription, an unbounded request is implicitly made.
For a passive version that observe and forward incoming data see doOnSuccess(Consumer)
and
doOnError(java.util.function.Consumer)
.
Keep in mind that since the sequence can be asynchronous, this will immediately return control to the calling thread. This can give the impression the consumer is not invoked when executing in a main thread or a unit test for instance.
consumer
- the consumer to invoke on each valueerrorConsumer
- the consumer to invoke on error signalcompleteConsumer
- the consumer to invoke on complete signalinitialContext
- the Context
for the subscriptionDisposable
that can be used to cancel the underlying Subscription
public final void subscribe(org.reactivestreams.Subscriber<? super T> actual)
subscribe
在接口中 org.reactivestreams.Publisher<T>
public abstract void subscribe(CoreSubscriber<? super T> actual)
Publisher.subscribe(Subscriber)
that will bypass
Hooks.onLastOperator(Function)
pointcut.
In addition to behave as expected by Publisher.subscribe(Subscriber)
in a controlled manner, it supports direct subscribe-time Context
passing.
subscribe
在接口中 CorePublisher<T>
actual
- the Subscriber
interested into the published sequencePublisher.subscribe(Subscriber)
public final Mono<T> subscribeOn(Scheduler scheduler)
Run subscribe, onSubscribe and request on a specified Scheduler
's Scheduler.Worker
.
As such, placing this operator anywhere in the chain will also impact the execution
context of onNext/onError/onComplete signals from the beginning of the chain up to
the next occurrence of a publishOn
.
mono.subscribeOn(Schedulers.parallel()).subscribe())
scheduler
- a Scheduler
providing the Scheduler.Worker
where to subscribeMono
requesting asynchronouslypublishOn(Scheduler)
public final <E extends org.reactivestreams.Subscriber<? super T>> E subscribeWith(E subscriber)
Subscriber
to this Mono
and return said
Subscriber
, allowing subclasses with a richer API to be used fluently.E
- the reified type of the Subscriber
for chainingsubscriber
- the Subscriber
to subscribe withSubscriber
after subscribing it to this Mono
public final Mono<T> switchIfEmpty(Mono<? extends T> alternate)
Fallback to an alternative Mono
if this mono is completed without data
alternate
- the alternate mono if this mono is emptyMono
falling back upon source completing without elementsFlux.switchIfEmpty(org.reactivestreams.Publisher<? extends T>)
public final Mono<T> tag(java.lang.String key, java.lang.String value)
Set
of
all tags throughout the publisher chain by using Scannable.tags()
(as
traversed by Scannable.parents()
).
The name is typically visible at assembly time by the tap(SignalListenerFactory)
operator,
which could for example be configured with a metrics listener applying the tag(s) to its meters.
key
- a tag keyvalue
- a tag valuename(String)
,
metrics()
public final Mono<T> take(java.time.Duration duration)
Give this Mono a chance to resolve within a specified time frame but complete if it
doesn't. This works a bit like timeout(Duration)
except that the resulting
Mono
completes rather than errors when the timer expires.
The timeframe is evaluated using the parallel Scheduler
.
duration
- the maximum duration to wait for the source Mono to resolve.Mono
that will propagate the signals from the source unless
no signal is received for duration
, in which case it completes.public final Mono<T> take(java.time.Duration duration, Scheduler timer)
timeout(Duration)
except that the resulting
Mono
completes rather than errors when the timer expires.
The timeframe is evaluated using the provided Scheduler
.
public final Mono<T> takeUntilOther(org.reactivestreams.Publisher<?> other)
Publisher
emits. If
the companion emits before any signal from the source, the resulting Mono will
complete. Otherwise, it will relay signals from the source.
other
- a companion Publisher
that shortcircuits the source with an
onComplete signal if it emits before the source emits.Mono
that will propagate the signals from the source unless
a signal is first received from the companion Publisher
, in which case it
completes.public final Mono<T> tap(java.util.function.Supplier<SignalListener<T>> simpleListenerGenerator)
Mono
and notify a stateful per-Subscriber
SignalListener
.
Any exception thrown by the SignalListener
methods causes the subscription to be cancelled
and the subscriber to be terminated with an onError signal
of that
exception. Note that SignalListener.doFinally(SignalType)
, SignalListener.doAfterComplete()
and
SignalListener.doAfterError(Throwable)
instead just drop
the exception.
This simplified variant assumes the state is purely initialized within the Supplier
,
as it is called for each incoming Subscriber
without additional context.
When the context-propagation library
is available at runtime and the downstream ContextView
is not empty, this operator implicitly uses the library
to restore thread locals around all invocations of SignalListener
methods. Typically, this would be done
in conjunction with the use of contextCapture()
operator down the chain.
simpleListenerGenerator
- the Supplier
to create a new SignalListener
on each subscriptionMono
with side effects defined by generated SignalListener
tap(Function)
,
tap(SignalListenerFactory)
public final Mono<T> tap(java.util.function.Function<ContextView,SignalListener<T>> listenerGenerator)
Mono
and notify a stateful per-Subscriber
SignalListener
.
Any exception thrown by the SignalListener
methods causes the subscription to be cancelled
and the subscriber to be terminated with an onError signal
of that
exception. Note that SignalListener.doFinally(SignalType)
, SignalListener.doAfterComplete()
and
SignalListener.doAfterError(Throwable)
instead just drop
the exception.
This simplified variant allows the SignalListener
to be constructed for each subscription
with access to the incoming Subscriber
's ContextView
.
When the context-propagation library
is available at runtime and the ContextView
is not empty, this operator implicitly uses the library
to restore thread locals around all invocations of SignalListener
methods. Typically, this would be done
in conjunction with the use of contextCapture()
operator down the chain.
listenerGenerator
- the Function
to create a new SignalListener
on each subscriptionMono
with side effects defined by generated SignalListener
tap(Supplier)
,
tap(SignalListenerFactory)
public final Mono<T> tap(SignalListenerFactory<T,?> listenerFactory)
Mono
and notify a stateful per-Subscriber
SignalListener
created by the provided SignalListenerFactory
.
The factory will initialize a state object
for
each Flux
or Mono
instance it is used with, and that state will be cached and exposed for each
incoming Subscriber
in order to generate the associated listener
.
Any exception thrown by the SignalListener
methods causes the subscription to be cancelled
and the subscriber to be terminated with an onError signal
of that
exception. Note that SignalListener.doFinally(SignalType)
, SignalListener.doAfterComplete()
and
SignalListener.doAfterError(Throwable)
instead just drop
the exception.
When the context-propagation library
is available at runtime and the downstream ContextView
is not empty, this operator implicitly uses the library
to restore thread locals around all invocations of SignalListener
methods. Typically, this would be done
in conjunction with the use of contextCapture()
operator down the chain.
listenerFactory
- the SignalListenerFactory
to create a new SignalListener
on each subscriptionFlux
with side effects defined by generated SignalListener
tap(Supplier)
,
tap(Function)
public final Mono<java.lang.Void> then()
Mono<Void>
,它只重播来自这个 Mono 的完成和异常信号,一般用于执行不关注返回值的下一步任务
Return a Mono<Void>
which only replays complete and error signals
from this Mono
.
Discard Support: This operator discards the element from the source.
Mono
ignoring its payload (actively dropping)public final <V> Mono<V> then(Mono<V> other)
Let this Mono
complete then play another Mono.
In other words ignore element from this Mono
and transform its completion signal into the
emission and completion signal of a provided Mono<V>
. Error signal is
replayed in the resulting Mono<V>
.
Discard Support: This operator discards the element from the source.
public final <V> Mono<V> thenReturn(V value)
Let this Mono
complete successfully, then emit the provided value. On an error in the original Mono
, the error signal is propagated instead.
Discard Support: This operator discards the element from the source.
V
- the element type of the supplied valuevalue
- a value to emit after successful terminationMono
that emits the supplied valuepublic final Mono<java.lang.Void> thenEmpty(org.reactivestreams.Publisher<java.lang.Void> other)
Return a Mono<Void>
that waits for this Mono
to complete then
for a supplied Publisher<Void>
to also complete. The
second completion signal is replayed, or any error signal that occurs instead.
Discard Support: This operator discards the element from the source.
other
- a Publisher
to wait for after this Mono's terminationMono
completing when both publishers have completed in
sequencepublic final <V> Flux<V> thenMany(org.reactivestreams.Publisher<V> other)
Mono
转换 Flux
。让这个 Mono
成功完成然后播放另一个Publisher
。在原始Mono中出现错误时,将传播错误信号
Let this Mono
complete successfully then play another Publisher
. On an error in the original Mono
, the error signal is propagated instead.
In other words ignore the element from this mono and transform the completion signal into a
Flux<V>
that will emit elements from the provided Publisher
.
Discard Support: This operator discards the element from the source.
V
- the element type of the supplied Publisherother
- a Publisher
to emit from after terminationFlux
that emits from the supplied Publisher
after
this Mono completes.public final Mono<Timed<T>> timed()
Mono
Subscriber.onNext(Object)
添加计时事件,封装到一个 Timed
对象中
Times this Mono
Subscriber.onNext(Object)
event, encapsulated into a Timed
object
that lets downstream consumer look at various time information gathered with nanosecond
resolution using the default clock (Schedulers.parallel()
):
Timed.elapsed()
: the time in nanoseconds since subscription, as a Duration
.
This is functionally equivalent to elapsed()
, with a more expressive and precise
representation than a Tuple2
with a long.Timed.timestamp()
: the timestamp of this onNext, as an Instant
(with nanoseconds part). This is functionally equivalent to timestamp()
, with a more
expressive and precise representation than a Tuple2
with a long.Timed.elapsedSinceSubscription()
: for Mono
this is the same as
Timed.elapsed()
.
The Timed
object instances are safe to store and use later, as they are created as an
immutable wrapper around the <T>
value and immediately passed downstream.
Mono
elapsed()
,
timestamp()
public final Mono<Timed<T>> timed(Scheduler clock)
Mono
Subscriber.onNext(Object)
event, encapsulated into a Timed
object
that lets downstream consumer look at various time information gathered with nanosecond
resolution using the provided Scheduler
as a clock:
Timed.elapsed()
: the time in nanoseconds since subscription, as a Duration
.
This is functionally equivalent to elapsed()
, with a more expressive and precise
representation than a Tuple2
with a long.Timed.timestamp()
: the timestamp of this onNext, as an Instant
(with nanoseconds part). This is functionally equivalent to timestamp()
, with a more
expressive and precise representation than a Tuple2
with a long.Timed.elapsedSinceSubscription()
: for Mono
this is the same as
Timed.elapsed()
.
The Timed
object instances are safe to store and use later, as they are created as an
immutable wrapper around the <T>
value and immediately passed downstream.
Mono
elapsed(Scheduler)
,
timestamp(Scheduler)
public final Mono<T> timeout(java.time.Duration timeout)
Duration
内没有任何元素发布,则传播TimeoutException
Propagate a TimeoutException
in case no item arrives within the given
Duration
.
public final Mono<T> timeout(java.time.Duration timeout, Scheduler timer)
Signal a TimeoutException
error in case an item doesn't arrive before the given period,
as measured on the provided Scheduler
.
public final Mono<T> timeout(java.time.Duration timeout, @Nullable Mono<? extends T> fallback, Scheduler timer)
public final <U> Mono<T> timeout(org.reactivestreams.Publisher<U> firstTimeout)
Signal a TimeoutException
in case the item from this Mono
has
not been emitted before the given Publisher
emits.
public final <U> Mono<T> timeout(org.reactivestreams.Publisher<U> firstTimeout, Mono<? extends T> fallback)
Switch to a fallback Publisher
in case the item from this Mono
has
not been emitted before the given Publisher
emits.
U
- the element type of the timeout PublisherfirstTimeout
- the timeout
Publisher
that must not emit before the first signal from this Mono
fallback
- the fallback Publisher
to subscribe when a timeout occursMono
with a fallback Mono
if the item doesn't
come before a Publisher
signalspublic final Mono<Tuple2<java.lang.Long,T>> timestamp(Scheduler scheduler)
Mono
is valued, emit a Tuple2
pair of
T1 the current clock time in millis (as a Long
measured by the
provided Scheduler
) and T2 the emitted data (as a T
).
The provider Scheduler
will be asked to provide time
with a granularity of TimeUnit.MILLISECONDS
. In order for this operator to work as advertised, the
provided Scheduler should thus return results that can be interpreted as unix timestamps.
scheduler
- a Scheduler
instance to read time fromMono
Scheduler.now(TimeUnit)
,
timed(Scheduler)
public final java.util.concurrent.CompletableFuture<T> toFuture()
CompletableFuture
Transform this Mono
into a CompletableFuture
completing on onNext or onComplete and failing on
onError.
CompletableFuture
public final <V> Mono<V> transform(java.util.function.Function<? super Mono<T>,? extends org.reactivestreams.Publisher<V>> transformer)
Mono
in order to generate a target Mono
. Unlike transformDeferred(Function)
, the
provided function is executed as part of assembly.
FunctionapplySchedulers = mono -> mono.subscribeOn(Schedulers.io()) .publishOn(Schedulers.parallel()); mono.transform(applySchedulers).map(v -> v * v).subscribe();
V
- the item type in the returned Mono
transformer
- the Function
to immediately map this Mono
into a target Mono
instance.Mono
transformDeferred(Function) for deferred composition of Mono for each Subscriber
,
as(Function) for a loose conversion to an arbitrary type
public final <V> Mono<V> transformDeferred(java.util.function.Function<? super Mono<T>,? extends org.reactivestreams.Publisher<V>> transformer)
Defer the given transformation to this Mono
in order to generate a
target Mono
type. A transformation will occur for each
Subscriber
. For instance:
mono.transformDeferred(original -> original.log());
V
- the item type in the returned Publisher
transformer
- the Function
to lazily map this Mono
into a target Mono
instance upon subscription.Mono
transform(Function) for immmediate transformation of Mono
,
transformDeferredContextual(BiFunction) for a similarly deferred transformation of Mono reading the ContextView
,
as(Function) for a loose conversion to an arbitrary type
public final <V> Mono<V> transformDeferredContextual(java.util.function.BiFunction<? super Mono<T>,? super ContextView,? extends org.reactivestreams.Publisher<V>> transformer)
Mono
in order to generate a
target Mono
type. A transformation will occur for each
Subscriber
. In addition, the transforming BiFunction
exposes
the ContextView
of each Subscriber
. For instance:
Mono<T> monoLogged = mono.transformDeferredContextual((original, ctx) -> original.log("for RequestID" + ctx.get("RequestID")) //...later subscribe. Each subscriber has its Context with a RequestID entry monoLogged.contextWrite(Context.of("RequestID", "requestA").subscribe(); monoLogged.contextWrite(Context.of("RequestID", "requestB").subscribe();
V
- the item type in the returned Publisher
transformer
- the BiFunction
to lazily map this Mono
into a target Mono
instance upon subscription, with access to ContextView
Mono
transform(Function) for immmediate transformation of Mono
,
transformDeferred(Function) for a similarly deferred transformation of Mono without the ContextView
,
as(Function) for a loose conversion to an arbitrary type
public final <T2> Mono<Tuple2<T,T2>> zipWhen(java.util.function.Function<T,Mono<? extends T2>> rightGenerator)
rightGenerator
function and combine both results into a Tuple2
.
T2
- the element type of the other Mono instancerightGenerator
- the Function
to generate a Mono
to combine withpublic final <T2,O> Mono<O> zipWhen(java.util.function.Function<T,Mono<? extends T2>> rightGenerator, java.util.function.BiFunction<T,T2,O> combinator)
rightGenerator
function and combine both results into an arbitrary
O
object, as defined by the provided combinator
function.
T2
- the element type of the other Mono instanceO
- the element type of the combinationrightGenerator
- the Function
to generate a Mono
to combine withcombinator
- a BiFunction
combinator function when both sources completepublic final <T2> Mono<Tuple2<T,T2>> zipWith(Mono<? extends T2> other)
Tuple2
.
An error or empty completion of any source will cause the other source to be cancelled and the resulting Mono to immediately error or complete, respectively.
T2
- the element type of the other Mono instanceother
- the Mono
to combine withpublic final <T2,O> Mono<O> zipWith(Mono<? extends T2> other, java.util.function.BiFunction<? super T,? super T2,? extends O> combinator)
O
object,
as defined by the provided combinator
function.
An error or empty completion of any source will cause the other source to be cancelled and the resulting Mono to immediately error or complete, respectively.
T2
- the element type of the other Mono instanceO
- the element type of the combinationother
- the Mono
to combine withcombinator
- a BiFunction
combinator function when both sources
completepublic java.lang.String toString()
toString
在类中 java.lang.Object