提问者:小点点

Spring集成配置包含优先级通道时,如何重新对消息进行排队


我有一个使用优先级通道的Spring集成配置。当从该通道读取项目时,会在该时间点检查本地资源,如果资源无法处理该项目,我想重新请求消息,以便另一台机器接收它。最初,我错误地抛出了一个异常,认为会发生重新请求,但正如我在另一个问题中回答的那样,这是行不通的,因为优先级通道在侦听器容器之外的另一个线程中执行。

我想过在入站通道适配器之后放置过滤器,如果当时资源不可用,则抛出异常,但在当时无法对资源进行准确评估,因为当时的资源可用性确实与根据优先级选择消息时可用的资源相匹配。

我的下一个想法是在优先通道之后和服务激活器之前放置一个过滤器,并将当前资源无法处理的消息定向到丢弃通道,丢弃通道定义为将消息发送回原始队列的出站通道适配器。这种方法有陷阱吗?

编辑20150917:

根据Gary的建议,我已经迁移到RabbitMQ 3.5. x以获取内置的优先级队列。我现在在跟踪尝试次数方面遇到了问题,因为我的原始消息似乎被放回队列中,而不是我修改的消息。我已经更新了代码块以反映当前的设置。

编辑20150922:

我更新这篇文章是为了反映我创建的概念代码库的最终证明。无论如何,我都不是Spring集成专家,所以请记住这一点,以及这个测试代码还没有准备好生产的事实。我最初的意图是在抛出特定异常时重新提交消息并重试一定次数。这可以使用StatefulRetryOperationsInterceptor来完成。但是为了进一步实验,我希望能够在失败时设置/增加一个标头,然后在我的流程中有一些可以对该值做出反应的东西。这是通过使用覆盖addtionalHeaders()RePublishMessageRecoverer的扩展来实现的。然后,此对象用于配置RetryOperationsInterceptor

还有一件小事:当我的信号异常被抛出时,我想减少一些默认的Spring集成日志记录,所以我需要确保我将错误通道命名为“errorChannel”以替换Spring集成默认值。我还需要创建一个自定义的ErrorHandler,将其分配给ListenerContainer默认值,该默认值将所有内容记录到ERROR级别。

这是我目前的设置:

Spring集成4.2.0。发布

Spring AMQP 1.5.0。释放

RabbitMQ 3.5. x

配置

@Autowired
public void setSpringIntegrationConfigHelper (SpringIntegrationHelper springIntegrationConfigHelper) {
    this.springIntegrationConfigHelper = springIntegrationConfigHelper;   
}

@Bean
public String priorityPOCQueueName() {
    return "poc.priority";
}

@Bean
public Queue priorityPOCQueue(RabbitAdmin rabbitAdmin) {
    boolean durable = true;
    boolean exclusive = false;
    boolean autoDelete = false;

    //Adding the x-max-priority argument is what signals RabbitMQ that this is a priority queue. Must be Rabbit 3.5.x
    Map<String,Object> arguments = new HashMap<String, Object>();
    arguments.put("x-max-priority", 5);

    Queue queue = new Queue(priorityPOCQueueName(),
            durable,
            exclusive,
            autoDelete,
            arguments);

    rabbitAdmin.declareQueue(queue);
    return queue;
}

@Bean
public Binding priorityPOCQueueBinding(RabbitAdmin rabbitAdmin) {
    Binding binding = new Binding(priorityPOCQueueName(),
                                  DestinationType.QUEUE,
                                  "amq.direct",
                                  priorityPOCQueue(rabbitAdmin).getName(),
                                  null);
    rabbitAdmin.declareBinding(binding);
    return binding;
}

@Bean
public AmqpTemplate priorityPOCMessageTemplate(ConnectionFactory amqpConnectionFactory,
                                                @Qualifier("priorityPOCQueueName") String queueName,
                                                @Qualifier("jsonMessageConverter") MessageConverter messageConverter) {
    RabbitTemplate template = new RabbitTemplate(amqpConnectionFactory);
    template.setChannelTransacted(false);
    template.setExchange("amq.direct");
    template.setQueue(queueName);
    template.setRoutingKey(queueName);
    template.setMessageConverter(messageConverter);
    return template;
}


@Autowired
@Qualifier("priorityPOCQueue")
public void setPriorityPOCQueue(Queue priorityPOCQueue) {
    this.priorityPOCQueue = priorityPOCQueue;
}

@Bean
public MessageRecoverer miTestMessageRecoverer(final AmqpTemplate priorityPOCMessageTemplate) {
    return new MessageRecoverer() {

        @Override
        public void recover(org.springframework.amqp.core.Message msg, Throwable t) {
            StringBuilder sb = new StringBuilder();
            sb.append("Firing Test Recoverer: ").append(t.getClass().getName()).append(" Message Count: ")
            .append(msg.getMessageProperties().getMessageCount())
            .append(" ID: ").append(msg.getMessageProperties().getMessageId())
            .append(" DeliveryTag: ").append(msg.getMessageProperties().getDeliveryTag())
            .append(" Redilivered: ").append(msg.getMessageProperties().isRedelivered());
            logger.debug(sb.toString());

            PriorityMessage m = new PriorityMessage(5);
            m.setId(randomGenerator.nextLong(10L, 1000000L));
            priorityPOCMessageTemplate.convertAndSend(m , new SimulateErrorHeaderPostProcessor(Boolean.FALSE, m.getPriority()));
        }

    };
}


@Bean
public RepublishMessageRecoverer miRepublishRecoverer(final AmqpTemplate priorityPOCMessageTemplate) {
    class MiRecoverer extends RepublishMessageRecoverer {

        public MiRecoverer(AmqpTemplate errorTemplate) {
            super(errorTemplate);
            this.setErrorRoutingKeyPrefix("");
        }

        @Override
        protected Map<? extends String, ? extends Object> additionalHeaders(
                org.springframework.amqp.core.Message message, Throwable cause) {
            Map<String, Object> map = new HashMap<>();
           if (message.getMessageProperties().getHeaders().containsKey("jmattempts") == false) {
                 map.put("jmattempts", 0);
            } else {
                Integer count = Integer.valueOf(message.getMessageProperties().getHeaders().get("jmattempts").toString());
                map.put("jmattempts", ++count);
            }
           return map;
        }
    } ;
    return new MiRecoverer(priorityPOCMessageTemplate);
}

@Bean
public StatefulRetryOperationsInterceptor inadequateResourceInterceptor(@Qualifier("priorityPOCMessageTemplate") AmqpTemplate priorityPOCMessageTemplate
        , @Qualifier("priorityMessageKeyGenerator") PriorityMessageKeyGenerator priorityMessageKeyGenerator
        ,  @Qualifier("miTestMessageRecoverer") MessageRecoverer messageRecoverer
        , @Qualifier("miRepublishRecoverer") RepublishMessageRecoverer miRepublishRecoverer) {
    StatefulRetryInterceptorBuilder b = RetryInterceptorBuilder.stateful();
    return b.maxAttempts(2)
            .backOffOptions(2000L, 1.0D, 4000L)
            .messageKeyGenerator(priorityMessageKeyGenerator)
            .recoverer(miRepublishRecoverer)
           .build();
 }


@Bean(name="exec.priorityPOC")
TaskExecutor taskExecutor() {
    ThreadPoolTaskExecutor e = new ThreadPoolTaskExecutor();
    e.setCorePoolSize(1);
    e.setQueueCapacity(1);
    return e;
}

/*    @Bean(name="poc.priorityChannel")
public MessageChannel pocPriorityChannel() {
    PriorityChannel c = new PriorityChannel(new PriorityComparator());
    c.setComponentName("poc.priorityChannel");
    c.setBeanName("poc.priorityChannel");
    return c;
}
*/
@Bean(name="poc.inputChannel")
public MessageChannel pocPriorityChannel() {
    DirectChannel c = new DirectChannel();
    c.setComponentName("poc.inputChannel");
    c.setBeanName("poc.inputChannel");
    return c;
}

@Bean(name="poc.inboundChannelAdapter") //make this a unique name
public AmqpInboundChannelAdapter amqpInboundChannelAdapter(@Qualifier("exec.priorityPOC") TaskExecutor taskExecutor
        , @Qualifier("errorChannel") MessageChannel pocErrorChannel
        , @Qualifier("inadequateResourceInterceptor") StatefulRetryOperationsInterceptor inadequateResourceInterceptor) {

    org.aopalliance.aop.Advice[] adviceChain = new org.aopalliance.aop.Advice[]{inadequateResourceInterceptor};
    int concurrentConsumers = 1;
    AmqpInboundChannelAdapter a =  springIntegrationConfigHelper.createInboundChannelAdapter(taskExecutor
            , pocPriorityChannel(), new Queue[]{priorityPOCQueue},  concurrentConsumers, adviceChain
            , new PocErrorHandler());
    a.setErrorChannel(pocErrorChannel);
    return a;

}

@Transformer(inputChannel = "poc.inputChannel", outputChannel = "poc.procesPoc")
public Message<PriorityMessage> incrementAttempts(Message<PriorityMessage> msg) {
    //I stopped using this in the POC.
    return msg;
}

@ServiceActivator(inputChannel="poc.procesPoc")
public void procesPoc(@Header(SimulateErrorHeaderPostProcessor.ERROR_SIMULATE_HEADER_KEY) Boolean simulateError
        ,  @Headers Map<String, Object> headerMap
        , PriorityMessage priorityMessage) throws InterruptedException {
    if (isFirstMessageReceived == false) {
        //Thread.sleep(15000); //Cause a bit of a backup so we can see prioritizing in action.
        isFirstMessageReceived = true;
    }
    Integer retryAttempts = 0;
    if (headerMap.containsKey("jmattempts")) {
        retryAttempts = Integer.valueOf(headerMap.get("jmattempts").toString());
    }
    logger.debug("Received message with priority: " + priorityMessage.getPriority() + ", simulateError: " + simulateError +  ", Current attempts count is "
        + retryAttempts);
    if (simulateError && retryAttempts < PriorityMessage.MAX_MESSAGE_RETRY_COUNT) {
        logger.debug(" Simulating an error and re-queue'ng. Current attempt count is " + retryAttempts);
        throw new AnalyzerNonAdequateResourceException();
    } else if (simulateError && retryAttempts > PriorityMessage.MAX_MESSAGE_RETRY_COUNT) {
        logger.debug(" Max attempt count exceeded");
    }
}

/**************************************************************************************************
 * 
 *                        Error Channel
 *
 **************************************************************************************************/
//Note that we want to override default Spring error channel, so the name of the bean must be errorChannel
@Bean(name="errorChannel")
public MessageChannel pocErrorChannel() {
    DirectChannel c = new DirectChannel();
    c.setComponentName("errorChannel");
    c.setBeanName("errorChannel");
    return c;
}

@ServiceActivator(inputChannel="errorChannel")
public void pocHandleError(Message<MessagingException> message) throws Throwable {
     MessagingException me = message.getPayload();
    logger.error("pocHandleError: error encountered: " +  me.getCause().getClass().getName());
    SortedMap<String, Object> sorted= new TreeMap<>();
    sorted.putAll(me.getFailedMessage().getHeaders());
    if (me.getCause() instanceof AnalyzerNonAdequateResourceException) {
        logger.debug("Headers: " + sorted.toString());
        //Let this message get requeued
        throw me.getCause();

    }


    Message<?> failedMsg = me.getFailedMessage();
    Object o = failedMsg.getPayload();
    StringBuilder sb = new StringBuilder();
    if (o != null) {
        sb.append("AnalyzerErrorHandler: Failed Message Type: ")
           .append(o.getClass().getCanonicalName()).append(". toString: ").append(o.toString());
        logger.error(sb.toString());
    }

    //The first level sometimes brings back either MessagingHandlingException or 
    //MessagingTransformationException which may contain a subcause 
    Exception e = (Exception)me.getCause();
    int i = 0;
    sb.delete(0, sb.length());
    sb.append("AnalyzerErrorHandler nested messages: ");
    while (e != null && i++ < 10) {
        sb.append(System.lineSeparator()).append("  ")
        .append(e.getClass().getCanonicalName()).append(": ")
        .append(e.getMessage());
    }
    if (i > 0) {
        logger.error(sb.toString());
    }

    //Don't want a message to recycle
    throw new AmqpRejectAndDontRequeueException(e);
}



/**
 * This gets set on the ListenerContainer.  The default handler on the listener
 * container logs everything with full stack trace.  We don't want to do that
 * for our known resource exception
 */
public static class PocErrorHandler implements ErrorHandler {

    @Override
    public void handleError(Throwable t) {
        Throwable cause = t.getCause();
        if (cause != null) {
            while (cause.getCause() != null) {
                cause = cause.getCause();
            }
        } else {
            cause = t;
        }
        if (cause instanceof AnalyzerNonAdequateResourceException) {
            logger.info(AnalyzerNonAdequateResourceException.class.getName() + ": not enough resources to process the item.");
            return;
        }
        else {
            logger.error("POC Listener Exception",  t);
        }
    }
}

Spring集成助手

protected ConnectionFactory connectionFactory;
protected MessageConverter messageConverter;

@Autowired
public void setConnectionFactory (ConnectionFactory connectionFactory) {
    this.connectionFactory = connectionFactory;
}

@Autowired
public void setMessageConverter(@Qualifier("jsonMessageConverter") MessageConverter messageConverter) {
    this.messageConverter = messageConverter;
}

public AmqpInboundChannelAdapter createInboundChannelAdapter(TaskExecutor taskExecutor
        , MessageChannel outputChannel, Queue[] queues, int concurrentConsumers
        , org.aopalliance.aop.Advice[] adviceChain,
        ErrorHandler errorHandler) {
    SimpleMessageListenerContainer listenerContainer =
            new SimpleMessageListenerContainer(connectionFactory);
    //AUTO is default, but setting it anyhow.
    listenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
    listenerContainer.setAutoStartup(true);
    listenerContainer.setConcurrentConsumers(concurrentConsumers);
    listenerContainer.setMessageConverter(messageConverter);
    listenerContainer.setQueues(queues);
    //listenerContainer.setChannelTransacted(false);
    listenerContainer.setErrorHandler(errorHandler);
    listenerContainer.setPrefetchCount(1);
    listenerContainer.setTaskExecutor(taskExecutor);
    listenerContainer.setDefaultRequeueRejected(true);
    if (adviceChain != null && adviceChain.length > 0) {
        listenerContainer.setAdviceChain(adviceChain);
    }



    AmqpInboundChannelAdapter a = new AmqpInboundChannelAdapter(listenerContainer);
    a.setMessageConverter(messageConverter);
    a.setAutoStartup(true);
    a.setHeaderMapper(MyAmqpHeaderMapper.createPassAllHeaders());
    a.setOutputChannel(outputChannel);
    return a;
}

共1个答案

匿名用户

不清楚为什么要在这种情况下使用PriorityChannel;为什么不在RabbitMQ中使用优先级队列?这样,您可以在容器线程上运行流。

自己将队列发送到队列的后面会起作用,但存在消息丢失的风险。