我正在使用apache kafka-client-3.2.0(也使用最新的Apache kafka-client-3.4.0 jar)在java中创建生产者。我有 5 个主题,每个主题的复制因子 = 1,分区 = 1,因为我只有 1 个代理。在这 5 个主题中,1 个由 log4j2 中的 KafkaAppender 使用,4 个由计时器任务使用,该计时器任务定期调用生产者转储这些主题。当我启动应用程序时,tcp 连接的数量开始增加。已经发现,已经建立了300多个连接,并且还在增加。
Kafka-版本Kafka-2.13_2.8.0,Kafka-2.13_3.3.1和Kafka-2.13_3.4.0
下面是我使用的生产者配置:
private KafkaProducer<String, String> producer;
public KafkaProducer<String, String> getProducer() {
return producer;
}
private KafkaProducer<String, String>
createAndGetProducer(String acksConfig, int retryConfig){
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
props.put(ProducerConfig.ACKS_CONFIG, acksConfig);
props.put(ProducerConfig.RETRIES_CONFIG, retryConfig);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new
KafkaProducer<>(props);
return producer;
}
以下是我从计时器任务调用以生成数据的方法:
public static void writeToTopic(String topicName, String value){
logger.info("Writing into topic " + topicName);
ProducerRecord <String, String> producerData = new ProducerRecord <String, String> (topicName, value);
KafkaProducer<String, String> producer = KafkaConnectionManager.getConnection().getProducer();
logger.info(producer.toString());
KafkaConnectionManager.getConnection().getProducer().send(producerData);
KafkaConnectionManager.getConnection().getProducer().flush();
}
以下是连接:
tcp6 0 0 X.X.X.X:9092 X.X.X.X:59604 ESTABLISHED 4369/java
tcp6 0 0 X.X.X.X:9092 X.X.X.X:48358 ESTABLISHED 4369/java
tcp6 0 0 X.X.X.X:9092 X.X.X.X:50668 ESTABLISHED 4369/java
tcp6 0 0 X.X.X.X:9092 X.X.X.X:59898 ESTABLISHED 4369/java
tcp6 0 0 X.X.X.X:9092 X.X.X.X:50626 ESTABLISHED 4369/java
tcp6 0 0 X.X.X.X:9092 X.X.X.X:59444 ESTABLISHED 4369/java
tcp6 0 0 X.X.X.X:9092 X.X.X.X:49716 ESTABLISHED 4369/java
tcp6 0 0 X.X.X.X:9092 X.X.X.X:61049 ESTABLISHED 4369/java
tcp6 0 0 X.X.X.X:9092 X.X.X.X:58516 ESTABLISHED 4369/java
tcp6 0 0 X.X.X.X:9092 X.X.X.X:51514 ESTABLISHED 4369/java
tcp6 0 0 X.X.X.X:9092 X.X.X.X:51892 ESTABLISHED 4369/java
tcp6 0 0 X.X.X.X:9092 X.X.X.X:50802 ESTABLISHED 4369/java
tcp6 0 0 X.X.X.X:9092 X.X.X.X:47106 ESTABLISHED 4369/java
tcp6 0 0 X.X.X.X:9092 X.X.X.X:60084 ESTABLISHED 4369/java
tcp6 0 0 X.X.X.X:9092 X.X.X.X:50588 ESTABLISHED 4369/java
tcp6 0 0 X.X.X.X:9092 X.X.X.X:50682 ESTABLISHED 4369/java
tcp6 0 0 X.X.X.X:9092 X.X.X.X:59814 ESTABLISHED 4369/java
tcp6 0 0 X.X.X.X:9092 X.X.X.X:50788 ESTABLISHED 4369/java
tcp6 0 0 X.X.X.X:9092 X.X.X.X:48554 ESTABLISHED 4369/java
tcp6 0 0 X.X.X.X:9092 X.X.X.X:51390 ESTABLISHED 4369/java
tcp6 0 0 X.X.X.X:9092 X.X.X.X:58576 ESTABLISHED 4369/java
tcp6 0 0 X.X.X.X:9092 X.X.X.X:50974 ESTABLISHED 4369/java
tcp6 0 0 X.X.X.X:9092 X.X.X.X:51504 ESTABLISHED 4369/java
tcp6 0 0 X.X.X.X:9092 X.X.X.X:47262 ESTABLISHED 4369/java
tcp6 0 0 X.X.X.X:9092 X.X.X.X:60022 ESTABLISHED 4369/java
tcp6 0 0 X.X.X.X:9092 X.X.X.X:50558 ESTABLISHED 4369/java
tcp6 0 0 X.X.X.X:9092 X.X.X.X:56118 ESTABLISHED 4369/java
tcp6 0 0 X.X.X.X:9092 X.X.X.X:51844 ESTABLISHED 4369/java
tcp6 0 0 X.X.X.X:9092 X.X.X.X:59712 ESTABLISHED 4369/java
tcp6 0 0 X.X.X.X:9092 X.X.X.X:51834 ESTABLISHED 4369/java
tcp6 0 0 X.X.X.X:9092 X.X.X.X:50626 ESTABLISHED 4369/java
tcp6 0 0 X.X.X.X:9092 X.X.X.X:59668 ESTABLISHED 4369/java
下面是连接管理器代码
private static KafkaConnectionManager kafkaConnectionManager = null;
public synchronized static KafkaConnectionManager getConnection() {
return kafkaConnectionManager;
}
public synchronized static KafkaConnectionManager initialize(Properties kafkaProps, Logger logger, String hostIp) throws KafkaConnectionManagerException {
if (kafkaConnectionManager == null) {
synchronized (KafkaConnectionManager.class) {
if (kafkaConnectionManager == null) {
kafkaConnectionManager = new KafkaConnectionManager(kafkaProps, logger, hostIp);
}
}
}
return kafkaConnectionManager;
}
private KafkaConnectionManager(Properties kafkaProps, Logger logger, String hostIp) throws KafkaConnectionManagerException{
if(kafkaProps == null)
{
throw new KafkaConnectionManagerException("kafkaProps property is null");
}
else if(logger == null)
{
throw new KafkaConnectionManagerException("logger is null");
}
else if(hostIp == null)
{
throw new KafkaConnectionManagerException("hostIp is null, set the hostIp");
}
KafkaConnectionManager.kafkaProps = kafkaProps;
this.logger = logger;
this.hostIp = hostIp;
brokers = kafkaProps.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG).trim();
logger.info("KAFKA brokers!!! = "+brokers);
this.acksConfig = kafkaProps.getProperty(ProducerConfig.ACKS_CONFIG).trim();
this.retryConfig = Integer.parseInt(kafkaProps.getProperty(ProducerConfig.RETRIES_CONFIG).trim());
this.producer = createAndGetProducer(acksConfig, retryConfig);
logger.info("Kafka Producer has been Initialized successfully");
}
在上面的代码片段方法中,从main方法调用了“公共同步静态KafkaConnectionManager初始化”。
以下是我正在使用的代理配置(与上述所有版本相同)
broker.id=1
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://X.X.X.X:9092
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=168
log.retention.check.interval.ms=300000
zookeeper.connect=10.64.223.70:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
max.connection.per.ip=100
max.connections=100
listener.name.internal.max.connections=100
request.timeout.ms=180000
connections.max.idle.ms=300000
我也在使用AdminAPI来创建主题。它也是从main方法调用的。以下是使用Admin API创建主题的代码
public void createTopics(Collection<NewTopic> newTopics, CreateTopicsOptions createTopicsOptions) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
AdminClient adminClient = KafkaAdminClient.create(props);
adminClient.createTopics(newTopics, createTopicsOptions);
}
我尝试了kafka官网链接中提到的所有配置,用于代理配置和生产者配置,期望进行身份验证,我现在没有使用任何身份验证。
单例是一种反模式,在这里没有任何改进。
您可以在应用程序的生命周期中构建一个< code>KafkaProducer,然后简单地重用它(假设不需要更改它的配置)。
这同样适用于管理客户端
;创建一个,然后将其作为参数传递给需要它的方法/类,而不是每次方法调用都构造一个新的。