我对Spring Reactive编程有点陌生。我试图从I/O得到一个Flux,然后返回一个对象列表,以及从我的服务中返回一个Mono要组合什么。
Flux<Obj1> -> Mono<Obj2>
Obj1
{
"a" : "123",
"combine" : "456"
"combine2" : "789"
}
Flux<Obj1> has multiple objects
Obj2
{
"a" : "123"
"combine" : {
"456" : "1"
},
"combine2" : {
"789" : "2"
}
}
Mono<Obj2> is a consolidation of flux with the Combiner keys.
为了实现这一点,我的初始方法是确保通量是在操作数据之后完成的。
Flux.just(obj1a,obj1b,obj1c)
.then();
但是上面的语句返回了Mono的一个空格,不确定在这种情况下,那么许多人如何工作。
我觉得这里缺少了一些东西,我应该如何在完成后控制Flux对象。
为了实现这一点,我的初始方法是确保通量是在操作数据之后完成的。
这是反应性编程的错误思维方式——你需要在数据流过通量时修改数据。然后()
方法将完全忽略通量的结果,并在完成时输出一些其他不相关的Mono
。
如果您想获取某个元素的Flux
,并想将其减少为某个其他元素的Mono
,那么您很可能需要reduce()
方法。在您的情况下,这将需要一个初始的Obj2
,然后是一个BiFunction
,其目的是在通量中获取一个中间的
Obj2
,一个Obj1
,然后生成一个更新的Obj2
。reduce()
操作符随后将对整个流应用此缩减,从而为您提供一个Mono
从您的代码中无法立即看出您想要具体实现什么,但下面是一个相关的示例(为了简洁起见使用lombok):
@Data
@AllArgsConstructor
class Obj1 {
private String a;
private String combine;
private String combine2;
}
@Data
@NoArgsConstructor
@AllArgsConstructor
class Obj2 {
private String a;
private Map<String, Integer> combine = new HashMap<>();
private Map<String, Integer> combine2 = new HashMap<>();
}
public class NewClass {
public static void main(String[] args) {
Flux<Obj1> flux = Flux.just(
new Obj1("123", "456", "789"),
new Obj1("123", "456", "789"),
new Obj1("123", "455", "789"));
Mono<Obj2> mono = flux.reduce(new Obj2(), (o2, o1) -> {
Map<String, Integer> combine = new HashMap<>(o2.getCombine());
combine.put(o1.getCombine(), combine.getOrDefault(o1.getCombine(), 0) + 1);
Map<String, Integer> combine2 = new HashMap<>(o2.getCombine2());
combine2.put(o1.getCombine2(), combine2.getOrDefault(o1.getCombine2(), 0) + 1);
return new Obj2(o1.getA(), combine, combine2);
});
mono.subscribe(System.out::println);
}
}