我正在使用spring webflux通过Schudters.elastic()从另一个服务调用另一个服务。
Mono<Integer> anaNotificationCountObservable = wrapWithRetryForFlux(wrapWithTimeoutForFlux(
notificationServiceMediatorFlux.getANANotificationCountForUser(userId).subscribeOn(reactor.core.scheduler.Schedulers.elastic())
)).onErrorReturn(0);
在主线程中,我设置了一个InhertitableThreadlocal变量,在子线程中,我试图访问它,它工作正常。
这是我的线程本地存储类
@Component
public class RequestCorrelation {
public static final String CORRELATION_ID = "correlation-id";
private InheritableThreadLocal<String> id = new InheritableThreadLocal<>();
public String getId() {
return id.get();
}
public void setId(final String correlationId) {
id.set(correlationId);
}
public void removeCorrelationId() {
id.remove();
}
}
现在的问题是它第一次工作正常,这意味着我在threadlocal中设置的值被传递给了其他服务。
但第二次,它也使用了旧id(在上次请求中生成)。
我尝试使用Schedulers.newSingle()而不是弹性(),那么它的工作正常。所以想想既然弹性()是重用线程,这就是为什么它不能清除/或它是重用的。
我应该如何解决问题。我在我的过滤器中设置线程本地,并在我的文件中清除相同的内容
requestCorrelation.setId(UUID.randomUUID().toString());
chain.doFilter(req,res)
requestCorrelation.removeCorrelationId();
在利用反应器管道时,切勿将资源或信息绑定到特定线程。Reactor本身是调度不可知的;使用您的库的开发人员可以选择在另一个调度程序上安排工作 - 如果您决定强制使用调度模型,则可能会失去性能优势。
相反,您可以在Reactor上下文中存储数据。这是一个类似地图的结构,与订阅者相关联,独立于调度安排。
这就是spring security和micrometer等项目存储通常属于本地线程的信息的方式。