提问者:小点点

项目Reactor调度器使用旧的threadlocal值是弹性的


我正在使用spring webflux通过Schudters.elastic()从另一个服务调用另一个服务。

Mono<Integer> anaNotificationCountObservable = wrapWithRetryForFlux(wrapWithTimeoutForFlux(
                notificationServiceMediatorFlux.getANANotificationCountForUser(userId).subscribeOn(reactor.core.scheduler.Schedulers.elastic())
        )).onErrorReturn(0);

在主线程中,我设置了一个InhertitableThreadlocal变量,在子线程中,我试图访问它,它工作正常。

这是我的线程本地存储类

@Component
public class RequestCorrelation {

    public static final String CORRELATION_ID = "correlation-id";

    private InheritableThreadLocal<String> id = new InheritableThreadLocal<>();

    public String getId() { 
       return id.get(); 
    }

    public void setId(final String correlationId) { 
       id.set(correlationId); 
    }

    public void removeCorrelationId() {
       id.remove();
    }
}

现在的问题是它第一次工作正常,这意味着我在threadlocal中设置的值被传递给了其他服务。

但第二次,它也使用了旧id(在上次请求中生成)。

我尝试使用Schedulers.newSingle()而不是弹性(),那么它的工作正常。所以想想既然弹性()是重用线程,这就是为什么它不能清除/或它是重用的。

我应该如何解决问题。我在我的过滤器中设置线程本地,并在我的文件中清除相同的内容

requestCorrelation.setId(UUID.randomUUID().toString());
chain.doFilter(req,res)
requestCorrelation.removeCorrelationId();

共1个答案

匿名用户

在利用反应器管道时,切勿将资源或信息绑定到特定线程。Reactor本身是调度不可知的;使用您的库的开发人员可以选择在另一个调度程序上安排工作 - 如果您决定强制使用调度模型,则可能会失去性能优势。

相反,您可以在Reactor上下文中存储数据。这是一个类似地图的结构,与订阅者相关联,独立于调度安排。

这就是spring security和micrometer等项目存储通常属于本地线程的信息的方式。