我有一个问题与Kafka生产者在生产中,我看到下面的错误。
发布失败,自批创建以来,myTopic-1:120004毫秒已过期2条记录[[org.apache.kafka.commun.errors.TimeoutException:自批创建之后,myTopic 1:120004毫秒已到期2条记录
Kafka代理是融合的5.3.2版本,Kafka客户端是apache 2.3.1。在我的代码中明确指定的Producer配置如下,其余为默认配置。
批量大小=102400linger.ms=100压缩类型=lz4 ack=所有
样本Java代码
ProducerRecord<String, String> rec = new ProducerRecord<String, String>("myTopic",1,"myKey","json-payload-here");
producer.send(rec, new ProducerCallback(jsonPayload));
private class ProducerCallback implements Callback {
private String _ME ="onCompletion";
private String jsonPayload;
public ProducerCallback(String jsonPayload) {
this.jsonPayload = jsonPayload;
}
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null) {
LOG.logp(Level.FINEST, _CL, _ME, "Published kafka event "+jsonPayload);
} else {
//Note: Exception is logged here.
LOG.log(Level.SEVERE, "Publish failed, "+e.getMessage(), e);
}
}
}
几个问题
谢谢,提前感谢您的帮助。
批处理将过期,因此不可以,除非您将数据保存在其他数据结构中,否则无法取回数据。
要实际发送批处理,可以减小批处理大小,也可以显式调用 producer.flush()。
若要增加超时的持续时间,请使用 request.timeout.ms
。