提问者:小点点

在rxjava中订阅消费者?


我在代码中替换了对Consumer的操作调用,但在订阅时,它一直要求我将其强制转换为observer

下面是代码

public void fetchSubscriptionPlans(String url, String apiKey, String authToken,
                                   final Consumer<List<ContentDatum>> subscriptionPlans) {
    appCMSSubscriptionPlanRest.getPlansById(url,authHeaders).enqueue(new Callback<List<ContentDatum>>() {
        @Override
        public void onResponse(Call<List<ContentDatum>> call, Response<List<ContentDatum>> response) {
            try {

                Observable.just(response.body())
                        .onErrorResumeNext(throwable -> Observable.empty())
                        .subscribe(subscriptionPlans);
            } catch (Exception e) {
                Observable.just((List<ContentDatum>) null)
                        .onErrorResumeNext(throwable -> Observable.empty())
                        .subscribe(subscriptionPlans);
            }
        }

        @Override
        public void onFailure(Call<List<ContentDatum>> call, Throwable t) {

        }
    });
}

我在.subscribe(subscriptionPlans);上发现错误,将其转换为.subscribe((observer<;?super list<;contentdatum>;)subscriptionPlans);

正确的方法应该是什么?


共1个答案

匿名用户

首先,确保您的消费者的类型为:IO.reactivex.functions.consumer

其次,我认为您的response.body()返回的不是ContentDatum列表

在执行以下操作时也会出现相同的错误:

Consumer<Integer> subscriptionPlans = list -> {};

Observable.just("Hello")
        .onErrorResumeNext((Throwable throwable) -> Observable.empty())
        .subscribe(subscriptionPlans);