提问者:小点点

将顺序Monos转换为Flux


我有一个Web服务,我想检索树的元素到根节点。我有一个网络流量接口,它在每次调用时返回一个单声道:

public interface WebService {
    Mono<Node> fetchNode(String nodeId);
}

public class Node {
    public String id;
    public String parentId; // null, if parent node
}

让我们假设有一棵树

    1
  2   3
  4   5

我想创建以下方法:

public interface ParentNodeResolver {

    Flux<Node> getNodeChain(String nodeId);
}

这将给我在getNodeChain(5)上一个通量,节点为5,3和1,然后完成。

不幸的是,我不太明白如何按顺序组合Monos,但不阻止它们。使用流量。generate(),我想我需要在每个mono上阻塞,以检查它是否有下一个元素。我发现的其他方法似乎只结合了固定数量的mono,但不是以这种递归方式。

下面是一个示例代码,它可以模拟网络请求,但有一定的延迟。

public class MonoChaining {
    ExecutorService executorService = Executors.newFixedThreadPool(5);

    @Test
    void name() {
        var nodeChain = generateNodeChainFlux("5")
                .collectList()
                .block();
        assertThat(nodeChain).isNotEmpty();
    }

    private Flux<Node> generateNodeChainFlux(String nodeId) {
        //TODO
        return Flux.empty();


    }

    public Mono<Node> getSingleNode(String nodeId) {
        var future =
                CompletableFuture.supplyAsync(() -> {
                    try {
                        Thread.sleep(2000); // Simulate delay
                        if ("5".equals(nodeId)) {
                            return new Node("5", "3");
                        } else if ("3".equals(nodeId)) {
                            return new Node("3", "1");
                        } else if ("1".equals(nodeId)) {
                            return new Node("1", null);
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return null;
                }, executorService);

        return Mono.fromFuture(future);
    }


    public static class Node {
        public String id;
        public String parentId;

        public Node(String id, String parentId) {
            this.id = id;
            this.parentId = parentId;
        }
    }
}

有办法找回这个吗?谢谢


共2个答案

匿名用户

您要查找的操作员是Mono#expand。它用于递归扩展序列。在这里阅读更多。

就你而言:

private Flux<Node> generateNodeChainFlux(String nodeId) {
        return getSingleNode(nodeId).expand(node -> getSingleNode(node.parentId));
    }

匿名用户

使用带有flatMap的递归来获取父节点,并使用concat将当前节点附加到生成的flux可能有效。请尝试以下代码:

public Flux<Node> getNodeChain(String nodeId) {
    return fetchNode(nodeId).flatMapMany(node -> {
        if (node.parent != null) {
            Flux<Node> nodeChain = getNodeChain(node.parent);
            return Flux.concat(Flux.just(node), nodeChain);
        }
        return Flux.just(node);
    });
}

这里我使用flatMapManyMono转换为Flux