我是响应式编程的新手,并创建了我自己的第一个Mutiny运算符,如https://smallrye.io/smallrye-mutiny/guides/plug.中所述
我的操作员是一个收集器,它收集列表中的x个项目,并在到达x或流完成时发送项目。
public class ListCollectorUntil<T> extends AbstractMultiOperator<T, List<T>> {
ListCollectorUntil(Multi<? extends T> upstream) {
super(upstream);
}
public ListCollectorUntil(Multi<? extends T> upstream, int count) {
this(upstream);
this.count = count;
}
private int count;
private final List<T> bufferList = new ArrayList<>();
private final List<T> tempBufferList = new ArrayList<>();
@Override
public void subscribe(MultiSubscriber<? super List<T>> downstream) {
upstream.subscribe().withSubscriber(new ListCollectorProcessor(downstream));
}
private class ListCollectorProcessor extends MultiOperatorProcessor<T, List<T>> {
public ListCollectorProcessor(MultiSubscriber<? super List<T>> downstream) {
super(downstream);
}
@Override
public void onItem(T item) {
bufferList.add(item);
if (bufferList.size() >= count) {
this.downstream.onItem(getUni());
}
}
@Override
public void onCompletion() {
Subscription subscription = this.upstream.getAndSet(Subscriptions.CANCELLED);
if (subscription != Subscriptions.CANCELLED) {
this.downstream.onItem(getUni());
this.downstream.onCompletion();
}
}
private List<T> getUni() {
tempBufferList.clear();
tempBufferList.addAll(bufferList);
bufferList.clear();
return tempBufferList;
}
}
当我开始使用以下代码对其进行测试时:
void buffer_after5Inputs_uniShouldBeReturned() {
Multi<List<Integer>> multi = Multi.createFrom().items(1, 2, 3, 4, 5, 6)
.plug( integerMulti -> new Buffer<>(integerMulti, 4));
multi.subscribe().with(
integerList -> System.out.println(integerList),
failur -> System.out.println(failur),
() -> System.out.println("completed")
);
AssertSubscriber<List<Integer>> assertSubscriber =
multi.subscribe().withSubscriber(AssertSubscriber.create(2))
.assertCompleted()
.assertItems(Arrays.asList(1, 2, 3, 4), Arrays.asList(5, 6));
}
从我的第一个订阅者那里,我得到:[1,2,3,4][5,6]完成了,但是断言订阅者接缝没有事件,既没有onItem也没有on完成我在哪里犯了错误吗?
过了一会儿,我想明白了我犯的哪个错误:
我没有正确使用AssertSubscriber:create(int)描述了从发射器请求了多少事件。在我的情况下,我需要6个事件
我没有对我的列表进行深度复制,而是使用我的新值更改为以前的列表。这种接缝就像,事件没有正确记录,但是事件指向的列表的值发生了变化。因此,显示了正确的值。我将代码从:
private List<T> getUni() {
tempBufferList.clear();
tempBufferList.addAll(bufferList);
bufferList.clear();
return tempBufferList;
到
private List<T> getUni() {
List<T> tempBufferList = new ArrayList<>(bufferList);
bufferList = new ArrayList<>();
return tempBufferList;
}
我的List缓冲区工作正常,测试运行正确。