提问者:小点点

你如何在项目Reactor中实现轮询逻辑?


我有一个方法,它发送请求以获取作业的状态并返回状态,它看起来像这样:

<代码>单声道

JobState对象有一个方法JobStatus. isDone(),它返回挂起的作业是否完成。

有没有办法让我继续订阅单声道,直到JobStatus. isDone()为真?即类似于getJobStatus()的东西。


共2个答案

匿名用户

一种选择是让getJobStatus()Mono仅在作业完成时发出,这可能不一定容易,具体取决于Mono当前的实现方式。

对于轮询,假设Mono在您每次订阅它时轮询,您可以使用中的重复时中的带直到配对:

getJobStatus()
    .repeatWhen(completed -> completed.delayElements(Duration.ofMillis(pollDelay))) //(1)
    .takeUntil(JobStatus::isDone) //(2)
    .last() //(3)

(1)反复重新订阅源Mono(这会产生Flux

(2)返回状态标记为done后立即取消上述重复周期

(3)切换回Mono

匿名用户

当您可能想要轮询直到从发布者那里获得成功或失败时,其他选项将是使用重复性时空。当有一些场景您不想永远等待获得响应,而是想要超时时,您可以使用相同的方法,其中您有自己的逻辑或库的方法来超时操作。

AtomicInteger c = new AtomicInteger();
Mono<String> source = Mono.defer(() -> c.getAndIncrement() < 3 
? Mono.empty() 
: Mono.just("7"));
return source.repeatWhenEmpty(4, Flux::cache);

在上面的示例4中指定了要重试的最大次数。

source.repeatWhenEmpty(exponentialBackOff(Duration.ofMillis(1), 
                Duration.ofMillis(15),
                Duration.ofSeconds(1)));

https://github.com/cloudfoundry/cf-java-client有指数方法。