T
- the value typeSinks
through
variations under Sinks.many().replay()
.@Deprecated public final class ReplayProcessor<T> extends FluxProcessor<T,T> implements Fuseable
Fuseable.ConditionalSubscriber<T>, Fuseable.QueueSubscription<T>, Fuseable.ScalarCallable<T>, Fuseable.SynchronousSubscription<T>
Scannable.Attr<T>
Disposable.Composite, Disposable.Swap
OPERATOR_NAME_UNRELATED_WORDS_PATTERN
限定符和类型 | 方法和说明 |
---|---|
Flux<T> |
asFlux()
已过时。
Return a
Flux view of this sink. |
static <T> ReplayProcessor<T> |
cacheLast()
已过时。
use
Sinks.many().replay().latest()
(or the unsafe variant if you're sure about external synchronization). To be removed in 3.5. |
static <T> ReplayProcessor<T> |
cacheLastOrDefault(T value)
已过时。
use
Sinks.many().replay().latestOrDefault(value)
(or the unsafe variant if you're sure about external synchronization). To be removed in 3.5. |
static <E> ReplayProcessor<E> |
create()
已过时。
use
Sinks.many().replay().all()
(or the unsafe variant if you're sure about external synchronization). To be removed in 3.5. |
static <E> ReplayProcessor<E> |
create(int historySize)
已过时。
use
Sinks.many().replay().limit(historySize)
(or the unsafe variant if you're sure about external synchronization). To be removed in 3.5. |
static <E> ReplayProcessor<E> |
create(int historySize,
boolean unbounded)
已过时。
use
Sinks.many().replay().limit(historySize)
for bounded cases (unbounded == false ) or Sinks.many().replay().all(bufferSize)
otherwise (or the unsafe variant if you're sure about external synchronization). To be removed in 3.5. |
static <T> ReplayProcessor<T> |
createSizeAndTimeout(int size,
java.time.Duration maxAge)
已过时。
use
Sinks.many().replay().limit(size, maxAge)
(or the unsafe variant if you're sure about external synchronization). To be removed in 3.5. |
static <T> ReplayProcessor<T> |
createSizeAndTimeout(int size,
java.time.Duration maxAge,
Scheduler scheduler)
已过时。
use
Sinks.many().replay().limit(size, maxAge, scheduler)
(or the unsafe variant if you're sure about external synchronization). To be removed in 3.5. |
static <T> ReplayProcessor<T> |
createTimeout(java.time.Duration maxAge)
已过时。
use
Sinks.many().replay().limit(maxAge)
(or the unsafe variant if you're sure about external synchronization). To be removed in 3.5. |
static <T> ReplayProcessor<T> |
createTimeout(java.time.Duration maxAge,
Scheduler scheduler)
已过时。
use
Sinks.many().replay().limit(maxAge, scheduler)
(or the unsafe variant if you're sure about external synchronization). To be removed in 3.5. |
Context |
currentContext()
已过时。
Request a
Context from dependent components which can include downstream
operators during subscribing or a terminal Subscriber . |
int |
currentSubscriberCount()
已过时。
Get how many
Subscribers are currently subscribed to the sink. |
long |
downstreamCount()
已过时。
Return the number of active
Subscriber or -1 if untracked. |
default void |
emitComplete(Sinks.EmitFailureHandler failureHandler)
A simplified attempt at completing via the
Sinks.Many.tryEmitComplete() API, generating an
onComplete signal. |
default void |
emitError(java.lang.Throwable error,
Sinks.EmitFailureHandler failureHandler)
A simplified attempt at failing the sequence via the
Sinks.Many.tryEmitError(Throwable) API, generating an
onError signal. |
default void |
emitNext(T value,
Sinks.EmitFailureHandler failureHandler)
A simplified attempt at emitting a non-null element via the
Sinks.Many.tryEmitNext(Object) API, generating an
onNext signal. |
java.lang.Throwable |
getError()
已过时。
Current error if any, default to null
|
int |
getPrefetch()
已过时。
The prefetch configuration of the
Flux |
java.util.stream.Stream<? extends Scannable> |
inners()
已过时。
Return a
Stream of referenced inners (flatmap, multicast etc) |
boolean |
isTerminated()
已过时。
Has this upstream finished or "completed" / "failed" ?
|
void |
onComplete()
已过时。
|
void |
onError(java.lang.Throwable throwable)
已过时。
|
void |
onNext(T t)
已过时。
|
void |
onSubscribe(org.reactivestreams.Subscription s)
已过时。
Implementors should initialize any state used by
Subscriber.onNext(Object) before
calling Subscription.request(long) . |
java.lang.Object |
scanUnsafe(Scannable.Attr key)
已过时。
This method is used internally by components to define their key-value mappings
in a single place.
|
void |
subscribe(CoreSubscriber<? super T> actual)
已过时。
An internal
Publisher.subscribe(Subscriber) that will bypass
Hooks.onLastOperator(Function) pointcut. |
Sinks.EmitResult |
tryEmitComplete()
已过时。
Try to terminate the sequence successfully, generating an
onComplete
signal. |
Sinks.EmitResult |
tryEmitError(java.lang.Throwable t)
已过时。
Try to fail the sequence, generating an
onError
signal. |
Sinks.EmitResult |
tryEmitNext(T t)
已过时。
Try emitting a non-null element, generating an
onNext signal. |
dispose, getBufferSize, hasCompleted, hasDownstreams, hasError, isSerialized, serialize, sink, sink, switchOnNext, wrap
all, any, as, blockFirst, blockFirst, blockLast, blockLast, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, bufferTimeout, bufferTimeout, bufferTimeout, bufferTimeout, bufferUntil, bufferUntil, bufferUntilChanged, bufferUntilChanged, bufferUntilChanged, bufferWhen, bufferWhen, bufferWhile, cache, cache, cache, cache, cache, cache, cancelOn, cast, checkpoint, checkpoint, checkpoint, collect, collect, collectList, collectMap, collectMap, collectMap, collectMultimap, collectMultimap, collectMultimap, collectSortedList, collectSortedList, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, concat, concat, concat, concat, concatDelayError, concatDelayError, concatDelayError, concatDelayError, concatMap, concatMap, concatMapDelayError, concatMapDelayError, concatMapDelayError, concatMapIterable, concatMapIterable, concatWith, concatWithValues, contextCapture, contextWrite, contextWrite, count, create, create, defaultIfEmpty, defer, deferContextual, delayElements, delayElements, delaySequence, delaySequence, delaySubscription, delaySubscription, delaySubscription, delayUntil, dematerialize, distinct, distinct, distinct, distinct, distinctUntilChanged, distinctUntilChanged, distinctUntilChanged, doAfterTerminate, doFinally, doFirst, doOnCancel, doOnComplete, doOnDiscard, doOnEach, doOnError, doOnError, doOnError, doOnNext, doOnRequest, doOnSubscribe, doOnTerminate, elapsed, elapsed, elementAt, elementAt, empty, error, error, error, expand, expand, expandDeep, expandDeep, filter, filterWhen, filterWhen, first, first, firstWithSignal, firstWithSignal, firstWithValue, firstWithValue, flatMap, flatMap, flatMap, flatMap, flatMapDelayError, flatMapIterable, flatMapIterable, flatMapSequential, flatMapSequential, flatMapSequential, flatMapSequentialDelayError, from, fromArray, fromIterable, fromStream, fromStream, generate, generate, generate, groupBy, groupBy, groupBy, groupBy, groupJoin, handle, hasElement, hasElements, hide, ignoreElements, index, index, interval, interval, interval, interval, join, just, just, last, last, limitRate, limitRate, limitRequest, log, log, log, log, log, log, map, mapNotNull, materialize, merge, merge, merge, merge, merge, merge, mergeComparing, mergeComparing, mergeComparing, mergeComparingDelayError, mergeComparingWith, mergeDelayError, mergeOrdered, mergeOrdered, mergeOrdered, mergeOrderedWith, mergePriority, mergePriority, mergePriority, mergePriorityDelayError, mergeSequential, mergeSequential, mergeSequential, mergeSequential, mergeSequential, mergeSequential, mergeSequentialDelayError, mergeSequentialDelayError, mergeSequentialDelayError, mergeWith, metrics, name, never, next, ofType, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureDrop, onBackpressureDrop, onBackpressureError, onBackpressureLatest, onErrorComplete, onErrorComplete, onErrorComplete, onErrorContinue, onErrorContinue, onErrorContinue, onErrorMap, onErrorMap, onErrorMap, onErrorResume, onErrorResume, onErrorResume, onErrorReturn, onErrorReturn, onErrorReturn, onErrorStop, onTerminateDetach, or, parallel, parallel, parallel, publish, publish, publish, publish, publishNext, publishOn, publishOn, publishOn, push, push, range, reduce, reduce, reduceWith, repeat, repeat, repeat, repeat, repeatWhen, replay, replay, replay, replay, replay, replay, retry, retry, retryWhen, sample, sample, sampleFirst, sampleFirst, sampleTimeout, sampleTimeout, scan, scan, scanWith, share, shareNext, single, single, singleOrEmpty, skip, skip, skip, skipLast, skipUntil, skipUntilOther, skipWhile, sort, sort, startWith, startWith, startWith, subscribe, subscribe, subscribe, subscribe, subscribe, subscribe, subscribe, subscribeOn, subscribeOn, subscribeWith, switchIfEmpty, switchMap, switchMap, switchOnFirst, switchOnFirst, switchOnNext, switchOnNext, tag, take, take, take, take, takeLast, takeUntil, takeUntilOther, takeWhile, tap, tap, tap, then, then, thenEmpty, thenMany, timed, timed, timeout, timeout, timeout, timeout, timeout, timeout, timeout, timestamp, timestamp, toIterable, toIterable, toIterable, toStream, toStream, toString, transform, transformDeferred, transformDeferredContextual, using, using, usingWhen, usingWhen, window, window, window, window, window, window, window, windowTimeout, windowTimeout, windowTimeout, windowTimeout, windowUntil, windowUntil, windowUntil, windowUntilChanged, windowUntilChanged, windowUntilChanged, windowWhen, windowWhile, windowWhile, withLatestFrom, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zipWith, zipWith, zipWith, zipWith, zipWithIterable, zipWithIterable
fusionModeName, fusionModeName
actuals, from, isScanAvailable, name, parents, scan, scanOrDefault, stepName, steps, tags, tagsDeduplicated
isDisposed
@Deprecated public static <T> ReplayProcessor<T> cacheLast()
Sinks.many().replay().latest()
(or the unsafe variant if you're sure about external synchronization). To be removed in 3.5.ReplayProcessor
that caches the last element it has pushed,
replaying it to late subscribers. This is a buffer-based ReplayProcessor with
a history size of 1.
T
- the type of the pushed elementsReplayProcessor
that replays its last pushed element to each new
Subscriber
@Deprecated public static <T> ReplayProcessor<T> cacheLastOrDefault(@Nullable T value)
Sinks.many().replay().latestOrDefault(value)
(or the unsafe variant if you're sure about external synchronization). To be removed in 3.5.ReplayProcessor
that caches the last element it has pushed,
replaying it to late subscribers. If a Subscriber
comes in before
any value has been pushed, then the defaultValue
is emitted instead.
This is a buffer-based ReplayProcessor with a history size of 1.
T
- the type of the pushed elementsvalue
- a default value to start the sequence with in case nothing has been
cached yet.ReplayProcessor
that replays its last pushed element to each new
Subscriber
, or a default one if nothing was pushed yet@Deprecated public static <E> ReplayProcessor<E> create()
Sinks.many().replay().all()
(or the unsafe variant if you're sure about external synchronization). To be removed in 3.5.ReplayProcessor
that replays an unbounded number of elements,
using a default internal Queue
.E
- the type of the pushed elementsReplayProcessor
that replays the whole history to each new
Subscriber
.@Deprecated public static <E> ReplayProcessor<E> create(int historySize)
Sinks.many().replay().limit(historySize)
(or the unsafe variant if you're sure about external synchronization). To be removed in 3.5.ReplayProcessor
that replays up to historySize
elements.E
- the type of the pushed elementshistorySize
- the backlog size, ie. maximum items retained for replay.ReplayProcessor
that replays a limited history to each new
Subscriber
.@Deprecated public static <E> ReplayProcessor<E> create(int historySize, boolean unbounded)
Sinks.many().replay().limit(historySize)
for bounded cases (unbounded == false
) or Sinks.many().replay().all(bufferSize)
otherwise (or the unsafe variant if you're sure about external synchronization). To be removed in 3.5.ReplayProcessor
that either replay all the elements or a
limited amount of elements depending on the unbounded
parameter.E
- the type of the pushed elementshistorySize
- maximum items retained if bounded, or initial link size if unboundedunbounded
- true if "unlimited" data store must be suppliedReplayProcessor
that replays the whole history to each new
Subscriber
if configured as unbounded, a limited history otherwise.@Deprecated public static <T> ReplayProcessor<T> createTimeout(java.time.Duration maxAge)
Sinks.many().replay().limit(maxAge)
(or the unsafe variant if you're sure about external synchronization). To be removed in 3.5.
In this setting, the ReplayProcessor
internally tags each observed item
with a timestamp value supplied by the Schedulers.parallel()
and keeps only
those whose age is less than the supplied time value converted to milliseconds. For
example, an item arrives at T=0 and the max age is set to 5; at T>=5 this first
item is then evicted by any subsequent item or termination signal, leaving the
buffer empty.
Once the processor is terminated, subscribers subscribing to it will receive items that remained in the buffer after the terminal signal, regardless of their age.
If an subscriber subscribes while the ReplayProcessor
is active, it will
observe only those items from within the buffer that have an age less than the
specified time, and each item observed thereafter, even if the buffer evicts items
due to the time constraint in the mean time. In other words, once an subscriber
subscribes, it observes items without gaps in the sequence except for any outdated
items at the beginning of the sequence.
T
- the type of items observed and emitted by the ProcessormaxAge
- the maximum age of the contained itemsReplayProcessor
that replays elements based on their age.@Deprecated public static <T> ReplayProcessor<T> createTimeout(java.time.Duration maxAge, Scheduler scheduler)
Sinks.many().replay().limit(maxAge, scheduler)
(or the unsafe variant if you're sure about external synchronization). To be removed in 3.5.
In this setting, the ReplayProcessor
internally tags each observed item
with a timestamp value supplied by the Scheduler
and keeps only
those whose age is less than the supplied time value converted to milliseconds. For
example, an item arrives at T=0 and the max age is set to 5; at T>=5 this first
item is then evicted by any subsequent item or termination signal, leaving the
buffer empty.
Once the processor is terminated, subscribers subscribing to it will receive items that remained in the buffer after the terminal signal, regardless of their age.
If an subscriber subscribes while the ReplayProcessor
is active, it will
observe only those items from within the buffer that have an age less than the
specified time, and each item observed thereafter, even if the buffer evicts items
due to the time constraint in the mean time. In other words, once an subscriber
subscribes, it observes items without gaps in the sequence except for any outdated
items at the beginning of the sequence.
T
- the type of items observed and emitted by the ProcessormaxAge
- the maximum age of the contained itemsReplayProcessor
that replays elements based on their age.@Deprecated public static <T> ReplayProcessor<T> createSizeAndTimeout(int size, java.time.Duration maxAge)
Sinks.many().replay().limit(size, maxAge)
(or the unsafe variant if you're sure about external synchronization). To be removed in 3.5.
In this setting, the ReplayProcessor
internally tags each received item
with a timestamp value supplied by the Schedulers.parallel()
and holds at
most
size
items in its internal buffer. It evicts items from the start of the
buffer if their age becomes less-than or equal to the supplied age in milliseconds
or the buffer reaches its size
limit.
When subscribers subscribe to a terminated ReplayProcessor
, they observe
the items that remained in the buffer after the terminal signal, regardless of
their age, but at most size
items.
If an subscriber subscribes while the ReplayProcessor
is active, it will
observe only those items from within the buffer that have age less than the
specified time and each subsequent item, even if the buffer evicts items due to the
time constraint in the mean time. In other words, once an subscriber subscribes, it
observes items without gaps in the sequence except for the outdated items at the
beginning of the sequence.
T
- the type of items observed and emitted by the ProcessormaxAge
- the maximum age of the contained itemssize
- the maximum number of buffered itemsReplayProcessor
that replay up to size
elements, but
will evict them from its history based on their age.@Deprecated public static <T> ReplayProcessor<T> createSizeAndTimeout(int size, java.time.Duration maxAge, Scheduler scheduler)
Sinks.many().replay().limit(size, maxAge, scheduler)
(or the unsafe variant if you're sure about external synchronization). To be removed in 3.5.
In this setting, the ReplayProcessor
internally tags each received item
with a timestamp value supplied by the Scheduler
and holds at most
size
items in its internal buffer. It evicts items from the start of the
buffer if their age becomes less-than or equal to the supplied age in milliseconds
or the buffer reaches its size
limit.
When subscribers subscribe to a terminated ReplayProcessor
, they observe
the items that remained in the buffer after the terminal signal, regardless of
their age, but at most size
items.
If an subscriber subscribes while the ReplayProcessor
is active, it will
observe only those items from within the buffer that have age less than the
specified time and each subsequent item, even if the buffer evicts items due to the
time constraint in the mean time. In other words, once an subscriber subscribes, it
observes items without gaps in the sequence except for the outdated items at the
beginning of the sequence.
T
- the type of items observed and emitted by the ProcessormaxAge
- the maximum age of the contained items in millisecondssize
- the maximum number of buffered itemsscheduler
- the Scheduler
that provides the current timeReplayProcessor
that replay up to size
elements, but
will evict them from its history based on their age.public void subscribe(CoreSubscriber<? super T> actual)
Flux
Publisher.subscribe(Subscriber)
that will bypass
Hooks.onLastOperator(Function)
pointcut.
In addition to behave as expected by Publisher.subscribe(Subscriber)
in a controlled manner, it supports direct subscribe-time Context
passing.
subscribe
在接口中 CorePublisher<T>
subscribe
在类中 Flux<T>
actual
- the Subscriber
interested into the published sequenceFlux.subscribe(Subscriber)
@Nullable public java.lang.Throwable getError()
FluxProcessor
getError
在类中 FluxProcessor<T,T>
@Nullable public java.lang.Object scanUnsafe(Scannable.Attr key)
Scannable
Scannable.Attr
key,
implementors should take care to return values of the correct type, and return
null if no specific value is available.
For public consumption of attributes, prefer using Scannable.scan(Attr)
, which will
return a typed value and fall back to the key's default if the component didn't
define any mapping.
scanUnsafe
在接口中 Scannable
scanUnsafe
在类中 FluxProcessor<T,T>
key
- a Scannable.Attr
to resolve for the component.public java.util.stream.Stream<? extends Scannable> inners()
Scannable
Stream
of referenced inners (flatmap, multicast etc)public long downstreamCount()
FluxProcessor
Subscriber
or -1 if untracked.downstreamCount
在类中 FluxProcessor<T,T>
Subscriber
or -1 if untrackedpublic boolean isTerminated()
FluxProcessor
isTerminated
在类中 FluxProcessor<T,T>
public void onSubscribe(org.reactivestreams.Subscription s)
CoreSubscriber
Subscriber.onNext(Object)
before
calling Subscription.request(long)
. Should further onNext
related
state modification occur, thread-safety will be required.
Note that an invalid request <= 0
will not produce an onError and
will simply be ignored or reported through a debug-enabled
Logger
.
onSubscribe
在接口中 org.reactivestreams.Subscriber<T>
onSubscribe
在接口中 CoreSubscriber<T>
public Context currentContext()
CoreSubscriber
Context
from dependent components which can include downstream
operators during subscribing or a terminal Subscriber
.currentContext
在接口中 CoreSubscriber<T>
currentContext
在类中 FluxProcessor<T,T>
Context.empty()
public int getPrefetch()
Flux
Flux
getPrefetch
在类中 Flux<T>
Flux
, -1 if unspecifiedpublic void onComplete()
onComplete
在接口中 org.reactivestreams.Subscriber<T>
public Sinks.EmitResult tryEmitComplete()
Sinks.Many
onComplete
signal. The result of the attempt is represented as an Sinks.EmitResult
, which possibly indicates error cases.
See the list of failure Sinks.EmitResult
in #emitComplete(EmitFailureHandler)
javadoc for an
example of how each of these can be dealt with, to decide if the emit API would be a good enough fit instead.
Sinks.EmitResult
, which should be checked to distinguish different possible failuresSubscriber.onComplete()
public void onError(java.lang.Throwable throwable)
onError
在接口中 org.reactivestreams.Subscriber<T>
public Sinks.EmitResult tryEmitError(java.lang.Throwable t)
Sinks.Many
onError
signal. The result of the attempt is represented as an Sinks.EmitResult
, which possibly indicates error cases.
See the list of failure Sinks.EmitResult
in #emitError(Throwable, EmitFailureHandler)
javadoc for an
example of how each of these can be dealt with, to decide if the emit API would be a good enough fit instead.
t
- the exception to signal, not nullSinks.EmitResult
, which should be checked to distinguish different possible failuresSubscriber.onError(Throwable)
public Sinks.EmitResult tryEmitNext(T t)
Sinks.Many
onNext
signal.
The result of the attempt is represented as an Sinks.EmitResult
, which possibly indicates error cases.
See the list of failure Sinks.EmitResult
in #emitNext(Object, EmitFailureHandler)
javadoc for an
example of how each of these can be dealt with, to decide if the emit API would be a good enough fit instead.
Might throw an unchecked exception as a last resort (eg. in case of a fatal error downstream which cannot be propagated to any asynchronous handler, a bubbling exception, ...).
t
- the value to emit, not nullSinks.EmitResult
, which should be checked to distinguish different possible failuresSubscriber.onNext(Object)
public int currentSubscriberCount()
Sinks.Many
Subscribers
are currently subscribed to the sink.
This is a best effort peek at the sink state, and a subsequent attempt at emitting
to the sink might still return Sinks.EmitResult.FAIL_ZERO_SUBSCRIBER
where relevant.
(generally in Sinks.Many.tryEmitNext(Object)
). Request (and lack thereof) isn't taken
into account, all registered subscribers are counted.
public Flux<T> asFlux()
Sinks.Many
Flux
view of this sink. Every call returns the same instance.Flux
view associated to this Sinks.Many
public void emitNext(T value, Sinks.EmitFailureHandler failureHandler)
Sinks.Many
Sinks.Many.tryEmitNext(Object)
API, generating an
onNext
signal.
If the result of the attempt is not a success
, implementations SHOULD retry the
Sinks.Many.tryEmitNext(Object)
call IF the provided Sinks.EmitFailureHandler
returns true
.
Otherwise, failures are dealt with in a predefined way that might depend on the actual sink implementation
(see below for the vanilla reactor-core behavior).
Generally, Sinks.Many.tryEmitNext(Object)
is preferable since it allows a custom handling
of error cases, although this implies checking the returned Sinks.EmitResult
and correctly
acting on it. This API is intended as a good default for convenience.
When the Sinks.EmitResult
is not a success, vanilla reactor-core operators have the following behavior:
Sinks.EmitResult.FAIL_ZERO_SUBSCRIBER
: no particular handling. should ideally discard the value but at that
point there's no Subscriber
from which to get a contextual discard handler.
Sinks.EmitResult.FAIL_OVERFLOW
: discard the value (Operators.onDiscard(Object, Context)
)
then call Sinks.Many.emitError(Throwable, Sinks.EmitFailureHandler)
with a Exceptions.failWithOverflow(String)
exception.
Sinks.EmitResult.FAIL_CANCELLED
: discard the value (Operators.onDiscard(Object, Context)
).
Sinks.EmitResult.FAIL_TERMINATED
: drop the value (Operators.onNextDropped(Object, Context)
).
Sinks.EmitResult.FAIL_NON_SERIALIZED
: throw an Sinks.EmissionException
mentioning RS spec rule 1.3.
Note that Sinks.unsafe()
never trigger this result. It would be possible for an Sinks.EmitFailureHandler
to busy-loop and optimistically wait for the contention to disappear to avoid this case for safe sinks...
Might throw an unchecked exception as a last resort (eg. in case of a fatal error downstream which cannot
be propagated to any asynchronous handler, a bubbling exception, a Sinks.EmitResult.FAIL_NON_SERIALIZED
as described above, ...).
emitNext
在接口中 Sinks.Many<T>
value
- the value to emit, not nullfailureHandler
- the failure handler that allows retrying failed Sinks.EmitResult
.Sinks.Many.tryEmitNext(Object)
,
Subscriber.onNext(Object)
public void emitComplete(Sinks.EmitFailureHandler failureHandler)
Sinks.Many
Sinks.Many.tryEmitComplete()
API, generating an
onComplete
signal.
If the result of the attempt is not a success
, implementations SHOULD retry the
Sinks.Many.tryEmitComplete()
call IF the provided Sinks.EmitFailureHandler
returns true
.
Otherwise, failures are dealt with in a predefined way that might depend on the actual sink implementation
(see below for the vanilla reactor-core behavior).
Generally, Sinks.Many.tryEmitComplete()
is preferable since it allows a custom handling
of error cases, although this implies checking the returned Sinks.EmitResult
and correctly
acting on it. This API is intended as a good default for convenience.
When the Sinks.EmitResult
is not a success, vanilla reactor-core operators have the following behavior:
Sinks.EmitResult.FAIL_OVERFLOW
: irrelevant as onComplete is not driven by backpressure.
Sinks.EmitResult.FAIL_ZERO_SUBSCRIBER
: the completion can be ignored since nobody is listening.
Note that most vanilla reactor sinks never trigger this result for onComplete, replaying the
terminal signal to later subscribers instead (to the exception of Sinks.UnicastSpec.onBackpressureError()
).
Sinks.EmitResult.FAIL_CANCELLED
: the completion can be ignored since nobody is interested.
Sinks.EmitResult.FAIL_TERMINATED
: the extra completion is basically ignored since there was a previous
termination signal, but there is nothing interesting to log.
Sinks.EmitResult.FAIL_NON_SERIALIZED
: throw an Sinks.EmissionException
mentioning RS spec rule 1.3.
Note that Sinks.unsafe()
never trigger this result. It would be possible for an Sinks.EmitFailureHandler
to busy-loop and optimistically wait for the contention to disappear to avoid this case in safe sinks...
Might throw an unchecked exception as a last resort (eg. in case of a fatal error downstream which cannot
be propagated to any asynchronous handler, a bubbling exception, a Sinks.EmitResult.FAIL_NON_SERIALIZED
as described above, ...).
emitComplete
在接口中 Sinks.Many<T>
failureHandler
- the failure handler that allows retrying failed Sinks.EmitResult
.Sinks.Many.tryEmitComplete()
,
Subscriber.onComplete()
public void emitError(java.lang.Throwable error, Sinks.EmitFailureHandler failureHandler)
Sinks.Many
Sinks.Many.tryEmitError(Throwable)
API, generating an
onError
signal.
If the result of the attempt is not a success
, implementations SHOULD retry the
Sinks.Many.tryEmitError(Throwable)
call IF the provided Sinks.EmitFailureHandler
returns true
.
Otherwise, failures are dealt with in a predefined way that might depend on the actual sink implementation
(see below for the vanilla reactor-core behavior).
Generally, Sinks.Many.tryEmitError(Throwable)
is preferable since it allows a custom handling
of error cases, although this implies checking the returned Sinks.EmitResult
and correctly
acting on it. This API is intended as a good default for convenience.
When the Sinks.EmitResult
is not a success, vanilla reactor-core operators have the following behavior:
Sinks.EmitResult.FAIL_OVERFLOW
: irrelevant as onError is not driven by backpressure.
Sinks.EmitResult.FAIL_ZERO_SUBSCRIBER
: the error is ignored since nobody is listening. Note that most vanilla reactor sinks
never trigger this result for onError, replaying the terminal signal to later subscribers instead
(to the exception of Sinks.UnicastSpec.onBackpressureError()
).
Sinks.EmitResult.FAIL_CANCELLED
: the error can be ignored since nobody is interested.
Sinks.EmitResult.FAIL_TERMINATED
: the error unexpectedly follows another terminal signal, so it is
dropped via Operators.onErrorDropped(Throwable, Context)
.
Sinks.EmitResult.FAIL_NON_SERIALIZED
: throw an Sinks.EmissionException
mentioning RS spec rule 1.3.
Note that Sinks.unsafe()
never trigger this result. It would be possible for an Sinks.EmitFailureHandler
to busy-loop and optimistically wait for the contention to disappear to avoid this case in safe sinks...
Might throw an unchecked exception as a last resort (eg. in case of a fatal error downstream which cannot
be propagated to any asynchronous handler, a bubbling exception, a Sinks.EmitResult.FAIL_NON_SERIALIZED
as described above, ...).
emitError
在接口中 Sinks.Many<T>
error
- the exception to signal, not nullfailureHandler
- the failure handler that allows retrying failed Sinks.EmitResult
.Sinks.Many.tryEmitError(Throwable)
,
Subscriber.onError(Throwable)