提问者:小点点

如何用单声道映射通量?


我有两项要求:

Flux<ProductID> getProductIds() {
    return this.webClient.get()
            .uri(PRODUCT_ID_URI)
            .accept(MediaType.APPLICATION_STREAM_JSON)
            .retrieve()
            .bodyToFlux(ProductID.class);
}

Mono<Product> getProduct(String id) {
    return this.itemServiceWebClient.get()
            .uri(uriBuilder -> uriBuilder.path(PRODUCT_URI + "/{id}")
                    .build(id))
            .accept(MediaType.APPLICATION_STREAM_JSON)
            .exchange()
            .flatMap(clientResponse -> clientResponse.bodyToMono(Product.class));
}

有了这些,我想做以下工作:

Flux<Product> getProducts() {
    return Flux.create(sink -> this.gateway.getProductIds()
            .doOnComplete(() -> {
                log.info("PRODUCTS COMPLETE");

                sink.complete();
            })
            .flatMap(productId -> this.getProduct(productId.getID()))
            .subscribe(product -> {
                log.info("NEW PRODUCT: " + product);

                sink.next(product);
            }));
}

当我调用它时,我得到以下输出:

PRODUCTS COMPLETE
NEW PRODUCT: ...
NEW PRODUCT: ...
....

当然,由于异步单声道映射,流在结果实际出现之前就关闭了。我怎样才能保持这个非阻塞,但也确保结果在调用上完成之前到达?


共1个答案

匿名用户

假设getProducts是一个控制器方法,并且您希望在模型中添加这些产品以在视图模板中渲染,您可以这样解决此问题:

@GetMapping("/products")
public String getProducts(Model model) {

    Flux<Product> products = this.gateway.getProductIds()
            .flatMap(productId -> this.getProduct(productId.getID()));
    model.put("products", products);
    // return the view name
    return "showProducts";
}