提问者:小点点

Kafka producer失败,myTopic的2条记录过期-1:120004自批量创建以来已过去毫秒


我有一个问题与Kafka生产者在生产中,我看到下面的错误。

发布失败,自批创建以来,myTopic-1:120004毫秒已过期2条记录[[org.apache.kafka.commun.errors.TimeoutException:自批创建之后,myTopic 1:120004毫秒已到期2条记录

Kafka代理是融合的5.3.2版本,Kafka客户端是apache 2.3.1。在我的代码中明确指定的Producer配置如下,其余为默认配置。

批量大小=102400linger.ms=100压缩类型=lz4 ack=所有

样本Java代码

ProducerRecord<String, String> rec = new ProducerRecord<String, String>("myTopic",1,"myKey","json-payload-here");
producer.send(rec, new ProducerCallback(jsonPayload));  

private class ProducerCallback implements Callback {

  private String _ME ="onCompletion";
  private String jsonPayload;

  public ProducerCallback(String jsonPayload) {
    this.jsonPayload = jsonPayload;
  }

  @Override
  public void onCompletion(RecordMetadata recordMetadata, Exception e) {
    if (e == null) {
      LOG.logp(Level.FINEST, _CL, _ME, "Published kafka event "+jsonPayload);
    } else {
      //Note: Exception is logged here.
      LOG.log(Level.SEVERE, "Publish failed, "+e.getMessage(), e);
    }
  }
}

几个问题

    < li >生产中的负荷并不大,目前还算适中,后期可能会很重。我错过了一些生产者配置,以纠正上述问题? < li >假设批处理中有2个记录已过期,有没有办法在java中获取这些过期的记录,以便获取有效负载和密钥来重新发布它们?

谢谢,提前感谢您的帮助。


共1个答案

匿名用户

批处理将过期,因此不可以,除非您将数据保存在其他数据结构中,否则无法取回数据。

要实际发送批处理,可以减小批处理大小,也可以显式调用 producer.flush()。若要增加超时的持续时间,请使用 request.timeout.ms