提问者:小点点

减少流量以形成单体


我对Spring Reactive编程有点陌生。我试图从I/O得到一个Flux,然后返回一个对象列表,以及从我的服务中返回一个Mono要组合什么。

Flux<Obj1> -> Mono<Obj2>

Obj1
{
"a" : "123",
"combine" : "456"
"combine2" : "789"
}

Flux<Obj1> has multiple objects

Obj2
{
"a" : "123"
"combine" : {
              "456" : "1"
            },
"combine2" : {
               "789" : "2"
             }
}

Mono<Obj2> is a consolidation of flux with the Combiner keys.

为了实现这一点,我的初始方法是确保通量是在操作数据之后完成的。

Flux.just(obj1a,obj1b,obj1c)
    .then();

但是上面的语句返回了Mono的一个空格,不确定在这种情况下,那么许多人如何工作。

我觉得这里缺少了一些东西,我应该如何在完成后控制Flux对象。


共1个答案

匿名用户

为了实现这一点,我的初始方法是确保通量是在操作数据之后完成的。

这是反应性编程的错误思维方式——你需要在数据流过通量时修改数据。然后()方法将完全忽略通量的结果,并在完成时输出一些其他不相关的Mono

如果您想获取某个元素的Flux,并想将其减少为某个其他元素的Mono,那么您很可能需要reduce()方法。在您的情况下,这将需要一个初始的Obj2,然后是一个BiFunction,其目的是在通量中获取一个中间的Obj2,一个Obj1,然后生成一个更新的Obj2reduce()操作符随后将对整个流应用此缩减,从而为您提供一个Mono

从您的代码中无法立即看出您想要具体实现什么,但下面是一个相关的示例(为了简洁起见使用lombok):

@Data
@AllArgsConstructor
class Obj1 {

    private String a;
    private String combine;
    private String combine2;
}

@Data
@NoArgsConstructor
@AllArgsConstructor
class Obj2 {

    private String a;
    private Map<String, Integer> combine = new HashMap<>();
    private Map<String, Integer> combine2 = new HashMap<>();

}

public class NewClass {

    public static void main(String[] args) {
        Flux<Obj1> flux = Flux.just(
                new Obj1("123", "456", "789"),
                new Obj1("123", "456", "789"),
                new Obj1("123", "455", "789"));

        Mono<Obj2> mono = flux.reduce(new Obj2(), (o2, o1) -> {
            Map<String, Integer> combine = new HashMap<>(o2.getCombine());
            combine.put(o1.getCombine(), combine.getOrDefault(o1.getCombine(), 0) + 1);
            Map<String, Integer> combine2 = new HashMap<>(o2.getCombine2());
            combine2.put(o1.getCombine2(), combine2.getOrDefault(o1.getCombine2(), 0) + 1);
            return new Obj2(o1.getA(), combine, combine2);
        });

        mono.subscribe(System.out::println);
    }

}