我有一个提供数据的消费者。数据被消费和处理-不是反应性的。然后我获取了这些数据并将其发送到:
Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);
我正在用
sink.emitNext(
message, retryOn(Sinks.EmitFailureHandler.FAIL_FAST, message));
正如其他堆栈帖子中建议的那样,我正在处理
Sinks.EmitResult.FAIL_NON_SERIALIZED
仍然停留在reector. core.Exceptions$Overflow Exception:Sink期间的背压溢出。许多#emitNext
前端有订阅者EventSource。我的GETendpoint(简化,在这里尝试了很多东西)
@GetMapping(path = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<Message>> streamData() {
return service
.getSink()
.asFlux()
.map(e -> ServerSentEvent.builder(e).event(e.getType().getId()).build());
}
我在其他帖子中尝试过建议。但我显然做不到:永不结束的热点事件流,其中订阅者一旦连接就不会错过任何消息。
代码是受欢迎的,但意见更重要。
回顾:永不结束的热点事件源,订阅者不会丢失任何消息。
编辑:为什么建议的帖子对我不起作用:我缺少消费者(队列的)-接收器-获取endpoint之间的部分:仍然以reector. core结尾。例外$Overflow异常:Sinks期间的背压溢出。许多#emitNext
消费者从队列接收数据。数据是非反应性处理的。发送到…的结果数据应在前端的EventSource中接收。
使用接收器发送处理后的数据:
sink = Sinks.many().multicast().directAllOrNothing();
sink.emitNext(
ServerSentEvent.builder(message).event(message.getType().getId()).build(),
retryOnNonSerialized(Sinks.EmitFailureHandler.FAIL_FAST,message)
);
控制器:
@PostConstruct
private void loadFlux() {
log.info("Constructing Flux....");
flux = service
.getSink()
.asFlux()
.publishOn(Schedulers.boundedElastic())
.onBackpressureBuffer()
.onBackpressureDrop(message -> blog.debug("[STREAM] Backpressure message drop: {}", message))
.share();
}
@GetMapping(path = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<Message>> streamData() {
return fluxData;
}
对我来说,这是永无止境的热流。