// 第一个订阅者 hotFlux.subscribe(d -> System.out.println("Subscriber 1 to Hot Source: "+d)); hotSource.onNext("blue"); hotSource.onNext("green");
// 第二个订阅者 hotFlux.subscribe(d -> System.out.println("Subscriber 2 to Hot Source: "+d)); hotSource.onNext("orange"); hotSource.onNext("purple"); hotSource.onComplete(); } // Subscriber 1 to Hot Source: BLUE // Subscriber 1 to Hot Source: GREEN // Subscriber 1 to Hot Source: ORANGE // Subscriber 2 to Hot Source: ORANGE // Subscriber 1 to Hot Source: PURPLE // Subscriber 2 to Hot Source: PURPLE
Caused by: java.lang.IndexOutOfBoundsException: Source emitted more than one item at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:134) at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129) at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.onNext(FluxFilterFuseable.java:118) at reactor.core.publisher.FluxArray$ArrayConditionalSubscription.fastPath(FluxArray.java:340) at reactor.core.publisher.FluxArray$ArrayConditionalSubscription.request(FluxArray.java:263) at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.request(FluxFilterFuseable.java:191) at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171) at reactor.core.publisher.MonoSingle$SingleSubscriber.doOnRequest(MonoSingle.java:103) at reactor.core.publisher.Operators$MonoInnerProducerBase.request(Operators.java:2841) at reactor.core.publisher.LambdaMonoSubscriber.onSubscribe(LambdaMonoSubscriber.java:121) at reactor.core.publisher.MonoSingle$SingleSubscriber.onSubscribe(MonoSingle.java:115) at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96) at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.onSubscribe(FluxFilterFuseable.java:87) at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:50) at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59) at reactor.core.publisher.Mono.subscribe(Mono.java:4716) at reactor.core.publisher.Mono.subscribeWith(Mono.java:4784) at reactor.core.publisher.Mono.subscribe(Mono.java:4544) at reactor.core.publisherMonoBlockTest.test3(MonoBlockTest.java:66)
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IndexOutOfBoundsException: Source emitted more than one item Caused by: java.lang.IndexOutOfBoundsException: Source emitted more than one item at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:134)⑴ Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: ⑵ Assembly trace from producer [reactor.core.publisher.MonoSingle] :⑶ reactor.core.publisher.Flux.single(Flux.java:8203) reactor.core.publisher.MonoBlockTest.test3(MonoBlockTest.java:65) Error has been observed at the following site(s):⑷ *__Flux.single ⇢ at reactor.core.publisher.MonoBlockTest.test3(MonoBlockTest.java:65)⑸ Original Stack Trace:⑹ at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:134)⑺ at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129) at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.onNext(FluxFilterFuseable.java:118)
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IndexOutOfBoundsException: Source emitted more than one item Caused by: java.lang.IndexOutOfBoundsException: Source emitted more than one item at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:134) Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: Error has been observed at the following site(s): *__checkpoint ⇢ 2 Original Stack Trace: at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:134) at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129) at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.onNext(FluxFilterFuseable.java:118)
Test class reactor.tools.agent.ReactorDebugJavaAgentTest [ERROR] (Test worker) Operator called default onErrorDropped - reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IndexOutOfBoundsException: Source emitted more than one item reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IndexOutOfBoundsException: Source emitted more than one item Caused by: java.lang.IndexOutOfBoundsException: Source emitted more than one item at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:134) Suppressed: The stacktrace has been enhanced by Reactor, refer to additional information below: Assembly trace from producer [reactor.core.publisher.MonoSingle] : reactor.core.publisher.Flux.single reactor.tools.agent.ReactorDebugJavaAgentTest.test(ReactorDebugJavaAgentTest.java:62) Error has been observed at the following site(s): *__Flux.single ⇢ at reactor.tools.agent.ReactorDebugJavaAgentTest.test(ReactorDebugJavaAgentTest.java:62) // 根 Original Stack Trace: at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:134) at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129) at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.onNext(FluxFilterFuseable.java:118)
Test classreactor.core.publisher.MonoBlockTest 00:08:19.834 [Test worker] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArrayConditionalSubscription) 00:08:19.846 [Test worker] INFO reactor.Flux.Array.1 - | request(unbounded) 00:08:19.847 [Test worker] INFO reactor.Flux.Array.1 - | onNext(hello) 00:08:19.848 [Test worker] INFO reactor.Flux.Array.1 - | onNext(world) 00:08:19.848 [Test worker] INFO reactor.Flux.Array.1 - | cancel() // 在获取第二个元素后流被取消 00:08:19.860 [Test worker] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IndexOutOfBoundsException: Source emitted more than one item Caused by: java.lang.IndexOutOfBoundsException: Source emitted more than one item at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:134) at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129) at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.onNext(FluxFilterFuseable.java:118) at reactor.core.publisher.FluxPeekFuseable$PeekFuseableConditionalSubscriber.onNext(FluxPeekFuseable.java:503)