T
- the value type.public class AssertSubscriber<T> extends java.lang.Object implements CoreSubscriber<T>, org.reactivestreams.Subscription
To create a new instance of AssertSubscriber
, you have the choice between
these static methods:
create()
: create a new AssertSubscriber
and requests
an unbounded number of elements.create(long)
: create a new AssertSubscriber
and
requests n
elements (can be 0 if you want no initial demand).
If you are testing asynchronous publishers, don't forget to use one of the
await*()
methods to wait for the data to assert.
You can extend this class but only the onNext, onError and onComplete can be overridden.
You can call request(long)
and cancel()
from any thread or from within
the overridable methods but you should avoid calling the assertXXX methods asynchronously.
Usage:
AssertSubscriber
.subscribe(publisher)
.await()
.assertValues("ABC", "DEF");
限定符和类型 | 字段和说明 |
---|---|
static java.time.Duration |
DEFAULT_VALUES_TIMEOUT
Default timeout for waiting next values to be received
|
构造器和说明 |
---|
AssertSubscriber() |
AssertSubscriber(Context context) |
AssertSubscriber(Context context,
long n) |
AssertSubscriber(long n) |
限定符和类型 | 方法和说明 |
---|---|
AssertSubscriber<T> |
assertComplete()
Assert a complete successfully signal has been received.
|
AssertSubscriber<T> |
assertContainValues(java.util.Set<? extends T> expectedValues)
Assert the specified values have been received.
|
AssertSubscriber<T> |
assertError()
Assert an error signal has been received.
|
AssertSubscriber<T> |
assertError(java.lang.Class<? extends java.lang.Throwable> clazz)
Assert an error signal has been received.
|
AssertSubscriber<T> |
assertErrorMessage(java.lang.String message) |
AssertSubscriber<T> |
assertErrorWith(java.util.function.Consumer<? super java.lang.Throwable> expectation)
Assert an error signal has been received.
|
AssertSubscriber<T> |
assertFuseableSource()
Assert that the upstream was a Fuseable source.
|
AssertSubscriber<T> |
assertFusionEnabled()
Assert that the fusion mode was granted.
|
AssertSubscriber<T> |
assertFusionMode(int expectedMode) |
AssertSubscriber<T> |
assertFusionRejected()
Assert that the fusion mode was granted.
|
AssertSubscriber<T> |
assertIncomplete(T... values) |
AssertSubscriber<T> |
assertNoError()
Assert no error signal has been received.
|
AssertSubscriber<T> |
assertNoEvents() |
AssertSubscriber<T> |
assertNonFuseableSource()
Assert that the upstream was not a Fuseable source.
|
AssertSubscriber<T> |
assertNotComplete()
Assert no complete successfully signal has been received.
|
AssertSubscriber<T> |
assertNotSubscribed()
Assert no subscription occurred.
|
AssertSubscriber<T> |
assertNotTerminated()
Assert no complete successfully or error signal has been received.
|
AssertSubscriber<T> |
assertNoValues()
Assert no values have been received.
|
AssertSubscriber<T> |
assertSubscribed()
Assert subscription occurred (once).
|
AssertSubscriber<T> |
assertTerminated()
Assert either complete successfully or error signal has been received.
|
AssertSubscriber<T> |
assertValueCount(long n)
Assert
n values has been received. |
AssertSubscriber<T> |
assertValues(T... expectedValues)
Assert the specified values have been received in the declared order.
|
AssertSubscriber<T> |
assertValueSequence(java.lang.Iterable<? extends T> expectedSequence)
Assert the specified values have been received in the same order read by the
passed
Iterable . |
AssertSubscriber<T> |
assertValuesWith(java.util.function.Consumer<T>... expectations)
Assert the specified values have been received in the declared order.
|
AssertSubscriber<T> |
await()
Blocking method that waits until a complete successfully or error signal is received.
|
AssertSubscriber<T> |
await(java.time.Duration timeout)
Blocking method that waits until a complete successfully or error signal is received
or until a timeout occurs.
|
static void |
await(java.time.Duration timeout,
java.lang.String errorMessage,
java.util.function.BooleanSupplier conditionSupplier)
Blocking method that waits until
conditionSupplier returns true, or if it
does not before the specified timeout, throw an AssertionError with the
specified error message. |
static void |
await(java.time.Duration timeout,
java.util.function.Supplier<java.lang.String> errorMessageSupplier,
java.util.function.BooleanSupplier conditionSupplier)
Blocking method that waits until
conditionSupplier returns true, or if it
does not before the specified timeout, throws an AssertionError with the
specified error message supplier. |
AssertSubscriber<T> |
awaitAndAssertNextValueCount(long n)
Blocking method that waits until
n next values have been received. |
AssertSubscriber<T> |
awaitAndAssertNextValues(T... values)
Blocking method that waits until
n next values have been received (n is the
number of values provided) to assert them. |
AssertSubscriber<T> |
awaitAndAssertNextValuesWith(java.util.function.Consumer<T>... expectations)
Blocking method that waits until
n next values have been received
(n is the number of expectations provided) to assert them. |
void |
cancel() |
AssertSubscriber<T> |
configureValuesStorage(boolean enabled)
Enable or disabled the values storage.
|
AssertSubscriber<T> |
configureValuesTimeout(java.time.Duration timeout)
Configure the timeout in seconds for waiting next values to be received (3 seconds
by default).
|
static <T> AssertSubscriber<T> |
create()
Create a new
AssertSubscriber that requests an unbounded number of elements. |
static <T> AssertSubscriber<T> |
create(long n)
Create a new
AssertSubscriber that requests initially n elements. |
Context |
currentContext()
Request a
Context from dependent components which can include downstream
operators during subscribing or a terminal Subscriber . |
int |
establishedFusionMode()
Returns the established fusion mode or -1 if it was not enabled
|
boolean |
isTerminated() |
void |
onComplete() |
void |
onError(java.lang.Throwable t) |
void |
onNext(T t) |
void |
onSubscribe(org.reactivestreams.Subscription s)
Implementors should initialize any state used by
Subscriber.onNext(Object) before
calling Subscription.request(long) . |
void |
request(long n) |
AssertSubscriber<T> |
requestedFusionMode(int requestMode)
Setup what fusion mode should be requested from the incoming
Subscription if it happens to be QueueSubscription
|
org.reactivestreams.Subscription |
upstream() |
java.util.List<T> |
values() |
public static final java.time.Duration DEFAULT_VALUES_TIMEOUT
public AssertSubscriber()
public AssertSubscriber(long n)
public AssertSubscriber(Context context)
public AssertSubscriber(Context context, long n)
public static void await(java.time.Duration timeout, java.util.function.Supplier<java.lang.String> errorMessageSupplier, java.util.function.BooleanSupplier conditionSupplier)
conditionSupplier
returns true, or if it
does not before the specified timeout, throws an AssertionError
with the
specified error message supplier.timeout
- the timeout durationerrorMessageSupplier
- the error message supplierconditionSupplier
- condition to break out of the wait loopjava.lang.AssertionError
public static void await(java.time.Duration timeout, java.lang.String errorMessage, java.util.function.BooleanSupplier conditionSupplier)
conditionSupplier
returns true, or if it
does not before the specified timeout, throw an AssertionError
with the
specified error message.timeout
- the timeout durationerrorMessage
- the error messageconditionSupplier
- condition to break out of the wait loopjava.lang.AssertionError
public static <T> AssertSubscriber<T> create()
AssertSubscriber
that requests an unbounded number of elements.
Be sure at least a publisher has subscribed to it via Publisher.subscribe(Subscriber)
before use assert methods.
T
- the observed value typepublic static <T> AssertSubscriber<T> create(long n)
AssertSubscriber
that requests initially n
elements. You
can then manage the demand with Subscription.request(long)
.
Be sure at least a publisher has subscribed to it via Publisher.subscribe(Subscriber)
before use assert methods.
T
- the observed value typen
- Number of elements to request (can be 0 if you want no initial demand).public final AssertSubscriber<T> configureValuesStorage(boolean enabled)
enabled
- enable value storage?public final AssertSubscriber<T> configureValuesTimeout(java.time.Duration timeout)
timeout
- the new default value timeout durationpublic final int establishedFusionMode()
public final AssertSubscriber<T> assertComplete()
public final AssertSubscriber<T> assertContainValues(java.util.Set<? extends T> expectedValues)
expectedValues
- the values to assertconfigureValuesStorage(boolean)
public final AssertSubscriber<T> assertError()
public final AssertSubscriber<T> assertError(java.lang.Class<? extends java.lang.Throwable> clazz)
clazz
- The class of the exception contained in the error signalpublic final AssertSubscriber<T> assertErrorMessage(java.lang.String message)
public final AssertSubscriber<T> assertErrorWith(java.util.function.Consumer<? super java.lang.Throwable> expectation)
expectation
- A method that can verify the exception contained in the error signal
and throw an exception (like an AssertionError
) if the exception is not valid.public final AssertSubscriber<T> assertFuseableSource()
public final AssertSubscriber<T> assertFusionEnabled()
public final AssertSubscriber<T> assertFusionMode(int expectedMode)
public final AssertSubscriber<T> assertFusionRejected()
public final AssertSubscriber<T> assertNoError()
public final AssertSubscriber<T> assertNoValues()
public final AssertSubscriber<T> assertNonFuseableSource()
public final AssertSubscriber<T> assertNotComplete()
public final AssertSubscriber<T> assertNotSubscribed()
public final AssertSubscriber<T> assertNotTerminated()
public final AssertSubscriber<T> assertSubscribed()
public final AssertSubscriber<T> assertTerminated()
public final AssertSubscriber<T> assertValueCount(long n)
n
values has been received.n
- the expected value countpublic final AssertSubscriber<T> assertValueSequence(java.lang.Iterable<? extends T> expectedSequence)
Iterable
. Values storage
should be enabled to
use this method.expectedSequence
- the values to assertconfigureValuesStorage(boolean)
@SafeVarargs public final AssertSubscriber<T> assertValues(T... expectedValues)
expectedValues
- the values to assertconfigureValuesStorage(boolean)
@SafeVarargs public final AssertSubscriber<T> assertValuesWith(java.util.function.Consumer<T>... expectations)
expectations
- One or more methods that can verify the values and throw a
exception (like an AssertionError
) if the value is not valid.configureValuesStorage(boolean)
public final AssertSubscriber<T> await()
public final AssertSubscriber<T> await(java.time.Duration timeout)
timeout
- The timeout valuepublic final AssertSubscriber<T> awaitAndAssertNextValueCount(long n)
n
next values have been received.n
- the value count to assert@SafeVarargs public final AssertSubscriber<T> awaitAndAssertNextValues(T... values)
n
next values have been received (n is the
number of values provided) to assert them.values
- the values to assert@SafeVarargs public final AssertSubscriber<T> awaitAndAssertNextValuesWith(java.util.function.Consumer<T>... expectations)
n
next values have been received
(n is the number of expectations provided) to assert them.expectations
- One or more methods that can verify the values and throw a
exception (like an AssertionError
) if the value is not valid.public void cancel()
cancel
在接口中 org.reactivestreams.Subscription
public final boolean isTerminated()
public void onComplete()
onComplete
在接口中 org.reactivestreams.Subscriber<T>
public void onError(java.lang.Throwable t)
onError
在接口中 org.reactivestreams.Subscriber<T>
public void onSubscribe(org.reactivestreams.Subscription s)
CoreSubscriber
Subscriber.onNext(Object)
before
calling Subscription.request(long)
. Should further onNext
related
state modification occur, thread-safety will be required.
Note that an invalid request <= 0
will not produce an onError and
will simply be ignored or reported through a debug-enabled
Logger
.
onSubscribe
在接口中 org.reactivestreams.Subscriber<T>
onSubscribe
在接口中 CoreSubscriber<T>
public void request(long n)
request
在接口中 org.reactivestreams.Subscription
@Nonnull public Context currentContext()
CoreSubscriber
Context
from dependent components which can include downstream
operators during subscribing or a terminal Subscriber
.currentContext
在接口中 CoreSubscriber<T>
Context.empty()
public final AssertSubscriber<T> requestedFusionMode(int requestMode)
requestMode
- the mode to request, see Fuseable constantspublic org.reactivestreams.Subscription upstream()
public java.util.List<T> values()
public final AssertSubscriber<T> assertNoEvents()
@SafeVarargs public final AssertSubscriber<T> assertIncomplete(T... values)