提问者:小点点

反应器-理解线程池。


我试图理解响应式编程是如何真正工作的。为此,我准备了简单的演示:来自Spring Framework的reactiveWebClient向简单的rest api发送请求,该客户端在每次操作中打印线程名称。

rest api:

@RestController
@SpringBootApplication
public class RestApiApplication {

    public static void main(String[] args) {
        SpringApplication.run(RestApiApplication.class, args);
    }

    @PostMapping("/resource")
    public void consumeResource(@RequestBody Resource resource) {
        System.out.println(String.format("consumed resource: %s", resource.toString()));
    }
}

@Data
@AllArgsConstructor
class Resource {
    private final Long id;
    private final String name;
}

和最重要的-响应式Web客户端:

@SpringBootApplication
public class ReactorWebclientApplication {

    public static void main(String[] args) {
        SpringApplication.run(ReactorWebclientApplication.class, args);
    }

    private final TcpClient tcpClient = TcpClient.create();

    private final WebClient webClient = WebClient.builder()
        .clientConnector(new ReactorClientHttpConnector(HttpClient.from(tcpClient)))
        .baseUrl("http://localhost:8080")
        .build();

    @PostConstruct
    void doRequests() {
        var longs = LongStream.range(1L, 10_000L)
            .boxed()
            .toArray(Long[]::new);

        var longsStream = Stream.of(longs);

        Flux.fromStream(longsStream)
            .map(l -> {
                System.out.println(String.format("------- map [%s] --------", Thread.currentThread().getName()));
                return new Resource(l, String.format("name %s", l));
            })
            .filter(res -> {
                System.out.println(String.format("------- filter [%s] --------", Thread.currentThread().getName()));
                return !res.getId().equals(11_000L);
            })
            .flatMap(res -> {
                System.out.println(String.format("------- flatmap [%s] --------", Thread.currentThread().getName()));
                return webClient.post()
                    .uri("/resource")
                    .syncBody(res)
                    .header("Content-Type", "application/json")
                    .header("Accept", "application/json")
                    .retrieve()
                    .bodyToMono(Resource.class)
                    .doOnSuccess(ignore -> System.out.println(String.format("------- onsuccess [%s] --------", Thread.currentThread().getName())))
                    .doOnError(ignore -> System.out.println(String.format("------- onerror [%s] --------", Thread.currentThread().getName())));
            })
            .blockLast();
    }

}

@JsonIgnoreProperties(ignoreUnknown = true)
class Resource {
    private final Long id;
    private final String name;

    @JsonCreator
    Resource(@JsonProperty("id") Long id, @JsonProperty("name")  String name) {
        this.id = id;
        this.name = name;
    }

    Long getId() {
        return id;
    }

    String getName() {
        return name;
    }

    @Override
    public String toString() {
        final StringBuilder sb = new StringBuilder("Resource{");
        sb.append("id=").append(id);
        sb.append(", name='").append(name).append('\'');
        sb.append('}');
        return sb.toString();
    }
}

问题是行为与我预测的不同。

我期望. map().filter().flitMap()的每个调用都将在main线程上执行,并且.doOn成功().doOnError的每个调用都将在nio线程池的线程上执行。所以我期望日志如下所示:

------- map [main] --------
------- filter [main] --------
------- flatmap [main] --------
(and so on...)
------- onsuccess [reactor-http-nio-2] --------
(and so on...)

但我得到的日志是:

------- map [main] --------
------- filter [main] --------
------- flatmap [main] --------
------- map [main] --------
------- filter [main] --------
------- flatmap [main] --------
------- onsuccess [reactor-http-nio-2] --------
------- onsuccess [reactor-http-nio-6] --------
------- onsuccess [reactor-http-nio-4] --------
------- onsuccess [reactor-http-nio-8] --------
------- map [reactor-http-nio-2] --------
------- filter [reactor-http-nio-2] --------
------- flatmap [reactor-http-nio-2] --------
------- map [reactor-http-nio-2] --------

接下来登录. map().filter().plitMap()都是在reactor-超文本传输协议-nio的线程上完成的。

接下来不可理解的事实是,在主线程执行的操作和reactor-超文本传输协议-nio之间的比率总是不同的,有时所有的操作. map().filter().plitMap()都是在主线程执行的。


共1个答案

匿名用户

Retor与RxJava一样,可以被认为与并发无关。也就是说,它不强制执行并发模型。相反,它让您(开发人员)来指挥。然而,这并不妨碍库帮助您解决并发问题。

获取FluxMono并不一定意味着它在专用的Thread中运行。相反,大多数运算符继续在前一个运算符执行的Thread中工作。除非指定,否则最顶层的运算符(源)本身运行在进行订阅()调用的Thread上。

项目Reactor相关留档可以在这里找到。

从您的代码中,以下片段:

webClient.post()
         .uri("/resource")
         .syncBody(res)
         .header("Content-Type", "application/json")
         .header("Accept", "application/json")
         .retrieve()
         .bodyToMono(Resource.class)

导致线程从main切换到netty的worker池。之后,所有以下操作都由netty worker线程执行。

如果你想控制这种行为,你应该在你的代码中添加一个PublishOn(…)语句,例如:

webClient.post()
         .uri("/resource")
         .syncBody(res)
         .header("Content-Type", "application/json")
         .header("Accept", "application/json")
         .retrieve()
         .bodyToMono(Resource.class)
         .publishOn(Schedulers.elastic())

这样,任何后续动作都将由弹性调度器线程池执行。

另一个例子是在请求执行之后使用专用调度程序来执行HTTP繁重任务。

import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
import static com.github.tomakehurst.wiremock.client.WireMock.get;
import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;

import com.github.tomakehurst.wiremock.WireMockServer;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import ru.lanwen.wiremock.ext.WiremockResolver;
import ru.lanwen.wiremock.ext.WiremockResolver.Wiremock;
import ru.lanwen.wiremock.ext.WiremockUriResolver;
import ru.lanwen.wiremock.ext.WiremockUriResolver.WiremockUri;

@ExtendWith({
  WiremockResolver.class,
  WiremockUriResolver.class
})
public class ReactiveThreadsControlTest {

  private static int concurrency = 1;

  private final WebClient webClient = WebClient.create();

  @Test
  public void slowServerResponsesTest(@Wiremock WireMockServer server, @WiremockUri String uri) {

    String requestUri = "/slow-response";

    server.stubFor(get(urlEqualTo(requestUri))
      .willReturn(aResponse().withStatus(200)
        .withFixedDelay((int) TimeUnit.SECONDS.toMillis(2)))
    );

    Flux
      .generate(() -> Integer.valueOf(1), (i, sink) -> {
        System.out.println(String.format("[%s] Emitting next value: %d", Thread.currentThread().getName(), i));
        sink.next(i);
        return i + 1;
      })
      .subscribeOn(Schedulers.single())
      .flatMap(i ->
          executeGet(uri + requestUri)
            .publishOn(Schedulers.elastic())
            .map(response -> {
              heavyTask();
              return true;
            })
        , concurrency)
      .subscribe();

    blockForever();
  }

  private void blockForever() {
    Object monitor = new Object();

    synchronized (monitor) {
      try {
        monitor.wait();
      } catch (InterruptedException ex) {
      }
    }
  }


  private Mono<ClientResponse> executeGet(String path) {
    System.out.println(String.format("[%s] About to execute an HTTP GET request: %s", Thread.currentThread().getName(), path));
    return webClient
      .get()
      .uri(path)
      .exchange();
  }

  private void heavyTask() {
    try {
      System.out.println(String.format("[%s] About to execute a heavy task", Thread.currentThread().getName()));
      Thread.sleep(TimeUnit.SECONDS.toMillis(20));
    } catch (InterruptedException ex) {
    }
  }
}

相关问题