Java源码示例:org.springframework.integration.acks.AcknowledgmentCallback.Status
示例1
@Override
protected MessageHandler getPolledConsumerErrorMessageHandler(
ConsumerDestination destination, String group,
ExtendedConsumerProperties<RocketMQConsumerProperties> properties) {
return message -> {
if (message.getPayload() instanceof MessagingException) {
AcknowledgmentCallback ack = StaticMessageHeaderAccessor
.getAcknowledgmentCallback(
((MessagingException) message.getPayload())
.getFailedMessage());
if (ack != null) {
if (properties.getExtension().shouldRequeue()) {
ack.acknowledge(Status.REQUEUE);
}
else {
ack.acknowledge(Status.REJECT);
}
}
}
};
}
示例2
@Override
protected MessageHandler getPolledConsumerErrorMessageHandler(
ConsumerDestination destination, String group,
ExtendedConsumerProperties<RabbitConsumerProperties> properties) {
MessageHandler handler = getErrorMessageHandler(destination, group, properties);
if (handler != null) {
return handler;
}
final MessageHandler superHandler = super.getErrorMessageHandler(destination,
group, properties);
return message -> {
Message amqpMessage = (Message) message.getHeaders()
.get(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE);
if (!(message instanceof ErrorMessage)) {
logger.error("Expected an ErrorMessage, not a "
+ message.getClass().toString() + " for: " + message);
}
else if (amqpMessage == null) {
if (superHandler != null) {
superHandler.handleMessage(message);
}
}
else {
if (message.getPayload() instanceof MessagingException) {
AcknowledgmentCallback ack = StaticMessageHeaderAccessor
.getAcknowledgmentCallback(
((MessagingException) message.getPayload())
.getFailedMessage());
if (ack != null) {
if (properties.getExtension().isRequeueRejected()) {
ack.acknowledge(Status.REQUEUE);
}
else {
ack.acknowledge(Status.REJECT);
}
}
}
}
};
}
示例3
@Test
public void testRequeue() {
TestChannelBinder binder = createBinder();
MessageConverterConfigurer configurer = this.context
.getBean(MessageConverterConfigurer.class);
DefaultPollableMessageSource pollableSource = new DefaultPollableMessageSource(
this.messageConverter);
configurer.configurePolledMessageSource(pollableSource, "foo");
AcknowledgmentCallback callback = mock(AcknowledgmentCallback.class);
pollableSource.addInterceptor(new ChannelInterceptor() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
return MessageBuilder.fromMessage(message)
.setHeader(
IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK,
callback)
.build();
}
});
ExtendedConsumerProperties<Object> properties = new ExtendedConsumerProperties<>(null);
properties.setMaxAttempts(2);
properties.setBackOffInitialInterval(0);
binder.bindPollableConsumer("foo", "bar", pollableSource, properties);
final AtomicInteger count = new AtomicInteger();
try {
assertThat(pollableSource.poll(received -> {
count.incrementAndGet();
throw new RequeueCurrentMessageException("test retry");
})).isTrue();
}
catch (Exception e) {
// no op
}
assertThat(count.get()).isEqualTo(2);
verify(callback).acknowledge(Status.REQUEUE);
}
示例4
@Test
public void testRequeueFromErrorFlow() {
TestChannelBinder binder = createBinder();
MessageConverterConfigurer configurer = this.context
.getBean(MessageConverterConfigurer.class);
DefaultPollableMessageSource pollableSource = new DefaultPollableMessageSource(
this.messageConverter);
configurer.configurePolledMessageSource(pollableSource, "foo");
AcknowledgmentCallback callback = mock(AcknowledgmentCallback.class);
pollableSource.addInterceptor(new ChannelInterceptor() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
return MessageBuilder.fromMessage(message)
.setHeader(
IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK,
callback)
.build();
}
});
ExtendedConsumerProperties<Object> properties = new ExtendedConsumerProperties<>(null);
properties.setMaxAttempts(1);
binder.bindPollableConsumer("foo", "bar", pollableSource, properties);
SubscribableChannel errorChannel = new DirectChannel();
errorChannel.subscribe(msg -> {
throw new RequeueCurrentMessageException((Throwable) msg.getPayload());
});
pollableSource.setErrorChannel(errorChannel);
try {
pollableSource.poll(received -> {
throw new RuntimeException("test requeue from error flow");
});
}
catch (Exception e) {
// no op
}
verify(callback).acknowledge(Status.REQUEUE);
}