提问者:小点点

Kafka消费者如何以及何时使用Apache Camel等EIP框架在Kafka中提交偏移量?我们如何使用camel-kakfa异步处理重试?


路由生成器的示例代码如下:

    // For out of seq event state (reque)
    onException(OutOfSeqStateException.class)
    .logStackTrace(false).logExhaustedMessageHistory(false)
        .setHeader("eventSource", constant(EventConstants.BACKEND))
        .delay(30000)/*.method(DelayerBean.class , "computeDelayInMillis")*/.asyncDelayed().// delay should be asynchronously
        .setBody().header(EventConstants.BE_STATE_EVENT)// send original event
        .to("direct:requeue");// toendpoint: requeroute

    // For handling other exceptions
    onException(Exception.class)
    .log("EXCEPTION OCCURED.....   ->  \"${exception.message}\"")
        .setExchangePattern(ExchangePattern.InOnly)
        .bean(KafkaErrorHandlerBean.class, "handle")
        .handled(true);

    // Backend Events Route
    from(commonCamelConfig.getKafkaConsumerEndpoint())
     .routeId("BackendStateIncomingRoute")
            .id(routeId)
            .to("log:" + fqClassName + "?showAll=true&level=" + logLevel)
            .unmarshal(jdf)
            .bean(MandatoryFieldCheckerBean.class, "performNullCheck")
            // all context info must be present,if not, throw exception
            .bean(ValidateEventHandlerBean.class, "validateIncomingEvents")
            .choice()
            .when().simple("${in.header.isValidEvent} == true",Boolean.class)//enter if valid event(backendstate/backenddata)
                    .choice()
                        .when(header("BEStateEvent").isNotNull())
                            .bean(EventTransformer.class, "getBackendTransformedEvent")
                            .bean(PaymentsService.class, "processMessage")
                            .bean(TransitionalStateHandlerBean.class,"handle")
                            .bean(AMQPProducer.class, "sendEventToMQ")
                            .setExchangePattern(ExchangePattern.InOnly)
                        .otherwise()
                            .bean(EventTransformer.class, "getBackendTransformedEvent")
                            .bean(PaymentsService.class, "processMessage")
                    .endChoice()
            .setExchangePattern(ExchangePattern.InOnly)//acknowledge only valid events, doesnt expect a reply
            .endChoice()
            .end();

    //Reque the original event in case of Retryable Exceptions
    from("direct:requeue").routeId("BackendDirectRequeRoute")
    .bean(RequestRetryHandlerBean.class, "doRetry")
    .to(commonCamelConfig.getKafkaConsumerEndpoint())
    .end();

消费者endpoint的Kafka配置如下:

    public String getKafkaConsumerEndpoint() {
    return properties.getJmsKafkaBroker()
            + ":" + properties.getKafkaPaymentsOtpTopic()
            + "?brokers="+ properties.getBootstrapServers()
            + "&groupId="+ properties.getGroupId()
            + "&autoOffsetReset="+ properties.getAutoOffsetReset()
            + "&autoCommitEnable=true"
            + "&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
            + "&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer";
}

我对上述代码有两个疑问:

>

  • 在整个路由的哪个点,是kafka消费者提交偏移量,还是它是一个独立的任务,因为我没有修改自动提交间隔字段的默认值,即5秒,所以这是否意味着它将在每5秒后独立提交一次。

    我想在序列外异常的情况下处理请求场景,我这样做的方式是通过创建另一个具有延迟模式逻辑的endpoint,该逻辑将在最大请求时间后将错误消息发送到DLQ。上述逻辑中有哪些漏洞,是否有更好的方法来处理相同的问题?如果这个逻辑看起来不错,那么请参阅我使用asyncdelay()进行异步延迟,但它似乎不起作用,并且在请求之前阻止新消息。请帮助我实现异步延迟。


  • 共1个答案

    匿名用户

    1)如果您使用的是骆驼版

    autoComualCommit=false:关闭偏移量的自动提交,以便我们可以使用手动提交。lowManualCommit=true:打开手动提交,让我们可以访问KafkaManualCommit功能。下面是代码片段:

    KafkaManualCommit manual =
                            exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
                    if (manual != null) {
                        LOGGER.info("committing the offset manually");
    manual.commitSync();
                }
    

    2)在你的第二个问题中,似乎你想再次将消息放回kafka处理。但是从你的代码来看,似乎你对消费者和生产者使用了相同的endpoint。当你想在kafka中生成消息时,你需要指定消息的“主题”、“分区”和“键”,我在你的代码中没有看到这些。谈到循环漏洞,因为你再次将消息放在kafka中,如果消息损坏了怎么办,所以你会继续获得相同的异常并将相同的消息再次放回kafka。我建议在相同的路由中重试消息。下面是代码片段:

    onException(YourException.class)
                    .maximumRedeliveries(3) // You can call some method too
                    .redeliveryDelay(100) // You can call some method too
                    .onRedelivery(exchange -> {
                        int retryCount = exchange.getIn().getHeader(Exchange.REDELIVERY_COUNTER, Integer.class);
                        log.debug("Recoverable exception occurred. Retried {} time " , retryCount);
                    })
                    .retryAttemptedLogLevel(LoggingLevel.DEBUG)
                    .to("someOtherRoute // Probably to error-topic