Java源码示例:rx.plugins.RxJavaHooks
示例1
/**
* ensure onError is only called if onCompleted was not (the {@link Observable} contract). Since it is a
* terminal event, {@link Scheduler.Worker#unsubscribe()} is called.
*/
private void sendError(Throwable e) {
if (sentError || sentCompleted) {
logger.error("Another terminal event was already sent, emitting as undeliverable", e);
RxJavaHooks.onError(e);
return;
}
sentError = true;
/*
* setting done is not strictly necessary since the error tracking above should be enough to stop periodic
* scheduling, but it is here for completeness
*/
done = true;
downstream.onError(e);
worker.shutdown();
}
示例2
@Override
public void onNext(Response<R> response) {
if (response.isSuccessful()) {
subscriber.onNext(response.body());
} else {
subscriberTerminated = true;
Throwable t = new HttpException(response);
try {
subscriber.onError(t);
} catch (OnCompletedFailedException | OnErrorFailedException | OnErrorNotImplementedException e) {
RxJavaHooks.getOnError().call(e);
} catch (Throwable inner) {
Exceptions.throwIfFatal(inner);
RxJavaHooks.getOnError().call(new CompositeException(t, inner));
}
}
}
示例3
@Override
public void onError(Throwable throwable) {
try {
subscriber.onNext(Result.<R>error(throwable));
} catch (Throwable t) {
try {
subscriber.onError(t);
} catch (OnCompletedFailedException | OnErrorFailedException | OnErrorNotImplementedException e) {
RxJavaHooks.getOnError().call(e);
} catch (Throwable inner) {
Exceptions.throwIfFatal(inner);
RxJavaHooks.getOnError().call(new CompositeException(t, inner));
}
return;
}
subscriber.onCompleted();
}
示例4
@Override
public Statement apply(final Statement base, Description description) {
return new Statement() {
@Override
public void evaluate() throws Throwable {
RxJavaHooks.reset();
RxJavaHooks.setOnIOScheduler(mMockSchedulerFunc);
RxJavaHooks.setOnNewThreadScheduler(mMockSchedulerFunc);
RxAndroidPlugins.getInstance().reset();
RxAndroidPlugins.getInstance().registerSchedulersHook(mRxAndroidSchedulersHook);
base.evaluate();
RxJavaHooks.reset();
RxAndroidPlugins.getInstance().reset();
}
};
}
示例5
@Before
public void setup() throws Exception {
dockerContainers.rabbit().assertUp();
rabbitTcpPort = dockerContainers.rabbit().tcpPort();
rabbitAdminPort = dockerContainers.rabbit().adminPort();
log.infoWithParams("****** Rabbit broker is up and running *****");
BrokerAddresses addresses = new BrokerAddresses("amqp://localhost:" + rabbitTcpPort);
channelFactory = new DefaultChannelFactory(addresses, connectionSettings);
consumerFactory = new DefaultConsumerFactory(channelFactory, consumeSettings);
publisherFactory = new DefaultPublisherFactory(channelFactory, publishSettings);
httpClient = new AsyncHttpClient();
messagesSeen.clear();
createQueues(channelFactory, inputQueue, new Exchange(inputExchange));
publisher = publisherFactory.createPublisher();
RxJavaHooks.setOnIOScheduler(null);
}
示例6
@Before
public void setUp() {
BaseActivity activity = Robolectric.setupActivity(UserActivity.class);
// ユースケースのモック
User user = new User();
user.id = 1L;
user.login = "user";
GetUserUseCase getUserUseCase = mock(GetUserUseCase.class);
when(getUserUseCase.run("user")).thenReturn(Single.just(user));
when(getUserUseCase.run("wrong")).thenReturn(Single.error(new Throwable("error")));
// イベントバスのモック
eventBus = mock(EventBus.class);
// ビューモデルの作成
userViewModel = new UserActivityViewModel(activity, getUserUseCase, eventBus);
// subscribeOnのスレッドをioからimmediateに変更
RxJavaHooks.setOnIOScheduler(scheduler -> Schedulers.immediate());
}
示例7
@Test
public void testB() {
AtomicBoolean isRxJavaHooksSetOnErrorCalled = new AtomicBoolean(false);
RxJavaHooks.setOnError(throwable -> {
isRxJavaHooksSetOnErrorCalled.set(true);
});
TestSubscriber<Object> ts = new TestSubscriber<>();
createProblematicObservable().subscribe(ts);
ts.awaitTerminalEvent();
// We assert that RxJavaHooks.onError was *not* called, because Observable.onErrorResumeNext
// should have been called.
Assert.assertFalse(isRxJavaHooksSetOnErrorCalled.get());
ts.assertNoErrors();
ts.assertValue("OK");
ts.assertCompleted();
}
示例8
public Action1<Throwable> call(final Transformer<? extends F, E> effectHandler) {
return new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
RxJavaHooks.onError(
new ConnectionException(effectHandler.getClass().toString(), throwable));
}
};
}
示例9
/**
* schedule an error to be sent and interrupt pending work
*/
public void offerError(Throwable e) {
if (isOnErrorScheduled || sentError || sentCompleted) {
logger.error("Another terminal event was already sent, emitting as undeliverable", e);
RxJavaHooks.onError(e);
return;
}
isOnErrorScheduled = true;
worker.schedule(ACTION_ERROR, () -> sendError(e));
}
示例10
@Override
public void onError(Throwable e) {
if (ExceptionsUtils.addThrowable(error, e)) {
other.unsubscribe();
done = true;
drain();
} else {
RxJavaHooks.onError(e);
}
}
示例11
void otherError(Throwable e) {
if (ExceptionsUtils.addThrowable(error, e)) {
unsubscribe();
done = true;
drain();
} else {
RxJavaHooks.onError(e);
}
}
示例12
@Override
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
action = RxJavaHooks.onScheduledAction(action);
TimedAction timed = new TimedAction(action, unit.toMillis(delayTime), 0);
actions.put(timed, DUMB);
return timed;
}
示例13
@Override
public Subscription schedulePeriodically(Action0 action, long initialDelay, long period, TimeUnit unit) {
action = RxJavaHooks.onScheduledAction(action);
TimedAction timed = new TimedAction(action, unit.toMillis(initialDelay), unit.toMillis(period));
actions.put(timed, DUMB);
return timed;
}
示例14
@Override
public Statement apply(Statement base, Description description) {
Statement st = new Statement() {
@Override
public void evaluate() throws Throwable {
Vertx v = vertx();
RxJavaSchedulersHook hook = RxHelper.schedulerHook(v.getOrCreateContext());
RxJavaHooks.setOnIOScheduler(f -> hook.getIOScheduler());
RxJavaHooks.setOnNewThreadScheduler(f -> hook.getNewThreadScheduler());
RxJavaHooks.setOnComputationScheduler(f -> hook.getComputationScheduler());
base.evaluate();
}
};
return super.apply(st, description);
}
示例15
void emitComplete() {
set(STATE_TERMINATED);
try {
if (!isUnsubscribed()) {
subscriber.onCompleted();
}
} catch (OnCompletedFailedException | OnErrorFailedException | OnErrorNotImplementedException e) {
RxJavaHooks.getOnError().call(e);
} catch (Throwable t) {
Exceptions.throwIfFatal(t);
RxJavaHooks.getOnError().call(t);
}
}
示例16
void emitError(Throwable t) {
set(STATE_TERMINATED);
if (!isUnsubscribed()) {
try {
subscriber.onError(t);
} catch (OnCompletedFailedException | OnErrorFailedException | OnErrorNotImplementedException e) {
RxJavaHooks.getOnError().call(e);
} catch (Throwable inner) {
Exceptions.throwIfFatal(inner);
RxJavaHooks.getOnError().call(new CompositeException(t, inner));
}
}
}
示例17
@Override
public void onError(Throwable throwable) {
if (!subscriberTerminated) {
subscriber.onError(throwable);
} else {
Throwable broken = new AssertionError("This should never happen! Report as a bug with the full stacktrace.");
//noinspection UnnecessaryInitCause Two-arg AssertionError constructor is 1.7+ only.
broken.initCause(throwable);
RxJavaHooks.getOnError().call(broken);
}
}
示例18
@Override
public void onCompleted() {
if (!subscriberTerminated) {
subscriber.onCompleted();
} else {
Throwable broken = new AssertionError("This should never happen! Report as a bug with the full stacktrace.");
RxJavaHooks.getOnError().call(broken);
}
}
示例19
public static void setupTestSchedulers() {
try {
RxJavaHooks.setOnIOScheduler(scheduler -> Schedulers.immediate());
RxJavaHooks.setOnNewThreadScheduler(scheduler -> Schedulers.immediate());
RxAndroidPlugins.getInstance().registerSchedulersHook(new RxAndroidSchedulersHook() {
@Override
public Scheduler getMainThreadScheduler() {
return Schedulers.immediate();
}
});
} catch (IllegalStateException ignored) {
}
}
示例20
private void setupTestSchedulers() {
RxJavaHooks.setOnIOScheduler(scheduler -> Schedulers.immediate());
RxJavaHooks.setOnNewThreadScheduler(scheduler -> Schedulers.immediate());
try {
RxAndroidPlugins.getInstance().registerSchedulersHook(new RxAndroidSchedulersHook() {
@Override
public Scheduler getMainThreadScheduler() {
return Schedulers.immediate();
}
});
} catch (IllegalStateException ignored) {
// already registered
}
}
示例21
@Test
public void consumer_closes_internal_subscriber_on_error_during_connection() throws Exception {
MonitoringTestThreadFactory threadFactory = new MonitoringTestThreadFactory();
Scheduler threadPoolScheduler = new CachedThreadScheduler(threadFactory);
RxJavaHooks.setOnIOScheduler((ioScheduler) -> threadPoolScheduler);
CountDownLatch retries = new CountDownLatch(10);
ConsumerSettings consumerSettings = new ConsumerSettings()
.withRetryCount(ConsumerSettings.RETRY_FOREVER)
.withNumChannels(1)
.withPreFetchCount(1024)
.withBackoffAlgorithm(integer -> {
retries.countDown();
return 1;
});
Observable<Message> consumer = new DefaultConsumerFactory(channelFactory, consumerSettings)
.createConsumer("non-existent-queue");
Subscription subscribe = consumer.subscribe();
retries.await();
subscribe.unsubscribe();
assertThat(threadFactory.getAliveThreads(), lessThan(10));
}
示例22
@Test
public void test() {
RxJavaHooks.setOnError(error -> System.out.println("got global error " + error));
Observable.just("1")
.first()
.toSingle()
.subscribe(
it -> {
System.out.println("going to throw");
throw new NullPointerException("bla");
},
error -> System.out.println("got error " + error)
);
}
示例23
public static <T> void subscribeAutoRelease(
Observable<T> source,
final Action1<T> onNext,
CompositeSubscription composite) {
Subscriber<T> subscriber = new Subscriber<T>() {
@Override
public void onCompleted() {
composite.remove(this);
}
@Override
public void onError(Throwable e) {
composite.remove(this);
RxJavaHooks.onError(e);
}
@Override
public void onNext(T t) {
try {
onNext.call(t);
} catch (Throwable ex) {
unsubscribe();
onError(ex);
}
}
};
composite.add(subscriber);
source.subscribe(subscriber);
}
示例24
@Override
public void onError(Throwable e) {
if (done) {
RxJavaHooks.onError(e);
return;
}
done = true;
parent.error(e);
}
示例25
@Override
public void call(Closeable c) {
try {
c.close();
} catch (IOException e) {
RxJavaHooks.onError(e);
}
}
示例26
public static void initRxSchedulers(Context context) {
RxJavaSchedulersHook hook = RxHelper.schedulerHook(context);
RxJavaHooks.setOnIOScheduler(f -> hook.getIOScheduler());
RxJavaHooks.setOnNewThreadScheduler(f -> hook.getNewThreadScheduler());
RxJavaHooks.setOnComputationScheduler(f -> hook.getComputationScheduler());
}
示例27
@After
public void after() {
RxJavaHooks.reset();
}
示例28
@Before
@After
public void resetBefore() {
RxJavaHooks.reset();
}
示例29
public void schedulerHook(Vertx vertx) {
RxJavaSchedulersHook hook = RxHelper.schedulerHook(vertx);
RxJavaHooks.setOnIOScheduler(f -> hook.getIOScheduler());
RxJavaHooks.setOnNewThreadScheduler(f -> hook.getNewThreadScheduler());
RxJavaHooks.setOnComputationScheduler(f -> hook.getComputationScheduler());
}
示例30
public void schedulerHook(Vertx vertx) {
RxJavaSchedulersHook hook = io.vertx.rxjava.core.RxHelper.schedulerHook(vertx);
RxJavaHooks.setOnIOScheduler(f -> hook.getIOScheduler());
RxJavaHooks.setOnNewThreadScheduler(f -> hook.getNewThreadScheduler());
RxJavaHooks.setOnComputationScheduler(f -> hook.getComputationScheduler());
}