Java源码示例:org.springframework.integration.acks.AcknowledgmentCallback.Status

示例1
@Override
protected MessageHandler getPolledConsumerErrorMessageHandler(
		ConsumerDestination destination, String group,
		ExtendedConsumerProperties<RocketMQConsumerProperties> properties) {
	return message -> {
		if (message.getPayload() instanceof MessagingException) {
			AcknowledgmentCallback ack = StaticMessageHeaderAccessor
					.getAcknowledgmentCallback(
							((MessagingException) message.getPayload())
									.getFailedMessage());
			if (ack != null) {
				if (properties.getExtension().shouldRequeue()) {
					ack.acknowledge(Status.REQUEUE);
				}
				else {
					ack.acknowledge(Status.REJECT);
				}
			}
		}
	};
}
 
示例2
@Override
protected MessageHandler getPolledConsumerErrorMessageHandler(
		ConsumerDestination destination, String group,
		ExtendedConsumerProperties<RabbitConsumerProperties> properties) {
	MessageHandler handler = getErrorMessageHandler(destination, group, properties);
	if (handler != null) {
		return handler;
	}
	final MessageHandler superHandler = super.getErrorMessageHandler(destination,
			group, properties);
	return message -> {
		Message amqpMessage = (Message) message.getHeaders()
				.get(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE);
		if (!(message instanceof ErrorMessage)) {
			logger.error("Expected an ErrorMessage, not a "
					+ message.getClass().toString() + " for: " + message);
		}
		else if (amqpMessage == null) {
			if (superHandler != null) {
				superHandler.handleMessage(message);
			}
		}
		else {
			if (message.getPayload() instanceof MessagingException) {
				AcknowledgmentCallback ack = StaticMessageHeaderAccessor
						.getAcknowledgmentCallback(
								((MessagingException) message.getPayload())
										.getFailedMessage());
				if (ack != null) {
					if (properties.getExtension().isRequeueRejected()) {
						ack.acknowledge(Status.REQUEUE);
					}
					else {
						ack.acknowledge(Status.REJECT);
					}
				}
			}
		}
	};
}
 
示例3
@Test
public void testRequeue() {
	TestChannelBinder binder = createBinder();
	MessageConverterConfigurer configurer = this.context
			.getBean(MessageConverterConfigurer.class);

	DefaultPollableMessageSource pollableSource = new DefaultPollableMessageSource(
			this.messageConverter);
	configurer.configurePolledMessageSource(pollableSource, "foo");
	AcknowledgmentCallback callback = mock(AcknowledgmentCallback.class);
	pollableSource.addInterceptor(new ChannelInterceptor() {

		@Override
		public Message<?> preSend(Message<?> message, MessageChannel channel) {
			return MessageBuilder.fromMessage(message)
					.setHeader(
							IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK,
							callback)
					.build();
		}

	});
	ExtendedConsumerProperties<Object> properties = new ExtendedConsumerProperties<>(null);
	properties.setMaxAttempts(2);
	properties.setBackOffInitialInterval(0);
	binder.bindPollableConsumer("foo", "bar", pollableSource, properties);
	final AtomicInteger count = new AtomicInteger();
	try {
		assertThat(pollableSource.poll(received -> {
			count.incrementAndGet();
			throw new RequeueCurrentMessageException("test retry");
		})).isTrue();
	}
	catch (Exception e) {
		// no op
	}
	assertThat(count.get()).isEqualTo(2);
	verify(callback).acknowledge(Status.REQUEUE);
}
 
示例4
@Test
public void testRequeueFromErrorFlow() {
	TestChannelBinder binder = createBinder();
	MessageConverterConfigurer configurer = this.context
			.getBean(MessageConverterConfigurer.class);

	DefaultPollableMessageSource pollableSource = new DefaultPollableMessageSource(
			this.messageConverter);
	configurer.configurePolledMessageSource(pollableSource, "foo");
	AcknowledgmentCallback callback = mock(AcknowledgmentCallback.class);
	pollableSource.addInterceptor(new ChannelInterceptor() {

		@Override
		public Message<?> preSend(Message<?> message, MessageChannel channel) {
			return MessageBuilder.fromMessage(message)
					.setHeader(
							IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK,
							callback)
					.build();
		}

	});
	ExtendedConsumerProperties<Object> properties = new ExtendedConsumerProperties<>(null);
	properties.setMaxAttempts(1);
	binder.bindPollableConsumer("foo", "bar", pollableSource, properties);
	SubscribableChannel errorChannel = new DirectChannel();
	errorChannel.subscribe(msg -> {
		throw new RequeueCurrentMessageException((Throwable) msg.getPayload());
	});
	pollableSource.setErrorChannel(errorChannel);
	try {
		pollableSource.poll(received -> {
			throw new RuntimeException("test requeue from error flow");
		});
	}
	catch (Exception e) {
		// no op
	}
	verify(callback).acknowledge(Status.REQUEUE);
}