提问者:小点点

在春暖花开中创造永不停息的热流


我有一个提供数据的消费者。数据被消费和处理-不是反应性的。然后我获取了这些数据并将其发送到:

Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);

我正在用

sink.emitNext(
    message, retryOn(Sinks.EmitFailureHandler.FAIL_FAST, message));

正如其他堆栈帖子中建议的那样,我正在处理

Sinks.EmitResult.FAIL_NON_SERIALIZED

仍然停留在reector. core.Exceptions$Overflow Exception:Sink期间的背压溢出。许多#emitNext

前端有订阅者EventSource。我的GETendpoint(简化,在这里尝试了很多东西)

@GetMapping(path = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
  public Flux<ServerSentEvent<Message>> streamData() {
    return service
        .getSink()
        .asFlux()
        .map(e -> ServerSentEvent.builder(e).event(e.getType().getId()).build());
  }

我在其他帖子中尝试过建议。但我显然做不到:永不结束的热点事件流,其中订阅者一旦连接就不会错过任何消息。

代码是受欢迎的,但意见更重要。

回顾:永不结束的热点事件源,订阅者不会丢失任何消息。

编辑:为什么建议的帖子对我不起作用:我缺少消费者(队列的)-接收器-获取endpoint之间的部分:仍然以reector. core结尾。例外$Overflow异常:Sinks期间的背压溢出。许多#emitNext


共1个答案

匿名用户

消费者从队列接收数据。数据是非反应性处理的。发送到…的结果数据应在前端的EventSource中接收。

使用接收器发送处理后的数据:

sink = Sinks.many().multicast().directAllOrNothing();
sink.emitNext(
   ServerSentEvent.builder(message).event(message.getType().getId()).build(),
   retryOnNonSerialized(Sinks.EmitFailureHandler.FAIL_FAST,message)
);

控制器:

@PostConstruct
private void loadFlux() {
   log.info("Constructing Flux....");
   flux = service
      .getSink()
      .asFlux()
      .publishOn(Schedulers.boundedElastic())
      .onBackpressureBuffer()
      .onBackpressureDrop(message -> blog.debug("[STREAM] Backpressure message drop: {}", message))
      .share();
}
@GetMapping(path = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<Message>> streamData() {
   return fluxData;
}

对我来说,这是永无止境的热流。