提问者:小点点

在java应用程序中使用kafka生产者时,获得了太多TCP-ESTAISHED连接。为什么?有人能告诉我这件事吗?


我正在使用apache kafka-client-3.2.0(也使用最新的Apache kafka-client-3.4.0 jar)在java中创建生产者。我有 5 个主题,每个主题的复制因子 = 1,分区 = 1,因为我只有 1 个代理。在这 5 个主题中,1 个由 log4j2 中的 KafkaAppender 使用,4 个由计时器任务使用,该计时器任务定期调用生产者转储这些主题。当我启动应用程序时,tcp 连接的数量开始增加。已经发现,已经建立了300多个连接,并且还在增加。

Kafka-版本Kafka-2.13_2.8.0,Kafka-2.13_3.3.1和Kafka-2.13_3.4.0

下面是我使用的生产者配置:

    private KafkaProducer<String, String> producer;
        public KafkaProducer<String, String> getProducer() {
                return producer;
            }
        private KafkaProducer<String, String> 
        createAndGetProducer(String acksConfig, int retryConfig){
                Properties props = new Properties();
             
       props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
                props.put(ProducerConfig.ACKS_CONFIG, acksConfig);
                props.put(ProducerConfig.RETRIES_CONFIG, retryConfig);
             
       props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
    "org.apache.kafka.common.serialization.StringSerializer");
             
       props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
    "org.apache.kafka.common.serialization.StringSerializer");
                KafkaProducer<String, String> producer = new 
        KafkaProducer<>(props);
            
            return producer;
        }

以下是我从计时器任务调用以生成数据的方法:


    public static void writeToTopic(String topicName, String value){
            logger.info("Writing into topic " + topicName);
            
            ProducerRecord <String, String> producerData = new ProducerRecord <String, String> (topicName, value);
            KafkaProducer<String, String> producer = KafkaConnectionManager.getConnection().getProducer();
            
            logger.info(producer.toString());
            KafkaConnectionManager.getConnection().getProducer().send(producerData);
            KafkaConnectionManager.getConnection().getProducer().flush();
        }

以下是连接:

tcp6       0      0  X.X.X.X:9092       X.X.X.X:59604      ESTABLISHED 4369/java
tcp6       0      0  X.X.X.X:9092       X.X.X.X:48358      ESTABLISHED 4369/java
tcp6       0      0  X.X.X.X:9092       X.X.X.X:50668      ESTABLISHED 4369/java
tcp6       0      0  X.X.X.X:9092       X.X.X.X:59898      ESTABLISHED 4369/java
tcp6       0      0  X.X.X.X:9092       X.X.X.X:50626      ESTABLISHED 4369/java
tcp6       0      0  X.X.X.X:9092       X.X.X.X:59444      ESTABLISHED 4369/java
tcp6       0      0  X.X.X.X:9092       X.X.X.X:49716      ESTABLISHED 4369/java
tcp6       0      0  X.X.X.X:9092       X.X.X.X:61049      ESTABLISHED 4369/java
tcp6       0      0  X.X.X.X:9092       X.X.X.X:58516      ESTABLISHED 4369/java
tcp6       0      0  X.X.X.X:9092       X.X.X.X:51514      ESTABLISHED 4369/java
tcp6       0      0  X.X.X.X:9092       X.X.X.X:51892      ESTABLISHED 4369/java
tcp6       0      0  X.X.X.X:9092       X.X.X.X:50802      ESTABLISHED 4369/java
tcp6       0      0  X.X.X.X:9092       X.X.X.X:47106      ESTABLISHED 4369/java
tcp6       0      0  X.X.X.X:9092       X.X.X.X:60084      ESTABLISHED 4369/java
tcp6       0      0  X.X.X.X:9092       X.X.X.X:50588      ESTABLISHED 4369/java
tcp6       0      0  X.X.X.X:9092       X.X.X.X:50682      ESTABLISHED 4369/java
tcp6       0      0  X.X.X.X:9092       X.X.X.X:59814      ESTABLISHED 4369/java
tcp6       0      0  X.X.X.X:9092       X.X.X.X:50788      ESTABLISHED 4369/java
tcp6       0      0  X.X.X.X:9092       X.X.X.X:48554      ESTABLISHED 4369/java
tcp6       0      0  X.X.X.X:9092       X.X.X.X:51390      ESTABLISHED 4369/java
tcp6       0      0  X.X.X.X:9092       X.X.X.X:58576      ESTABLISHED 4369/java
tcp6       0      0  X.X.X.X:9092       X.X.X.X:50974      ESTABLISHED 4369/java
tcp6       0      0  X.X.X.X:9092       X.X.X.X:51504      ESTABLISHED 4369/java
tcp6       0      0  X.X.X.X:9092       X.X.X.X:47262      ESTABLISHED 4369/java
tcp6       0      0  X.X.X.X:9092       X.X.X.X:60022      ESTABLISHED 4369/java
tcp6       0      0  X.X.X.X:9092       X.X.X.X:50558      ESTABLISHED 4369/java
tcp6       0      0  X.X.X.X:9092       X.X.X.X:56118      ESTABLISHED 4369/java
tcp6       0      0  X.X.X.X:9092       X.X.X.X:51844      ESTABLISHED 4369/java
tcp6       0      0  X.X.X.X:9092       X.X.X.X:59712      ESTABLISHED 4369/java
tcp6       0      0  X.X.X.X:9092       X.X.X.X:51834      ESTABLISHED 4369/java
tcp6       0      0  X.X.X.X:9092       X.X.X.X:50626      ESTABLISHED 4369/java
tcp6       0      0  X.X.X.X:9092       X.X.X.X:59668      ESTABLISHED 4369/java

下面是连接管理器代码


    private static KafkaConnectionManager kafkaConnectionManager = null;
    
    public synchronized static KafkaConnectionManager getConnection() {     
            return kafkaConnectionManager;
        }
    
    public synchronized static KafkaConnectionManager initialize(Properties kafkaProps, Logger logger, String hostIp) throws KafkaConnectionManagerException {      
            if (kafkaConnectionManager == null) {
                synchronized (KafkaConnectionManager.class) {
                    if (kafkaConnectionManager == null) {
                        kafkaConnectionManager = new KafkaConnectionManager(kafkaProps, logger, hostIp);
                    }
                }
            }
            return kafkaConnectionManager;
        }
    
    private KafkaConnectionManager(Properties kafkaProps, Logger logger, String hostIp) throws KafkaConnectionManagerException{
            if(kafkaProps == null)
            {
                throw new KafkaConnectionManagerException("kafkaProps property is null");
            }
            else if(logger == null)
            {
                throw new KafkaConnectionManagerException("logger is null");
            }
            else if(hostIp == null)
            {
                throw new KafkaConnectionManagerException("hostIp is null, set the hostIp");
            }
            KafkaConnectionManager.kafkaProps = kafkaProps;
            this.logger = logger;
            this.hostIp = hostIp;
            brokers = kafkaProps.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG).trim();
            logger.info("KAFKA brokers!!! = "+brokers);
            this.acksConfig = kafkaProps.getProperty(ProducerConfig.ACKS_CONFIG).trim();
            this.retryConfig = Integer.parseInt(kafkaProps.getProperty(ProducerConfig.RETRIES_CONFIG).trim());
            
            this.producer = createAndGetProducer(acksConfig, retryConfig);
    logger.info("Kafka Producer has been Initialized successfully");
            
        }

在上面的代码片段方法中,从main方法调用了“公共同步静态KafkaConnectionManager初始化”。

以下是我正在使用的代理配置(与上述所有版本相同)

broker.id=1
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://X.X.X.X:9092
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
num.network.threads=3 
num.io.threads=8 
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400 
socket.request.max.bytes=104857600

log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1


offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

log.flush.interval.messages=10000 
log.flush.interval.ms=1000

log.retention.hours=168 
log.retention.check.interval.ms=300000

zookeeper.connect=10.64.223.70:2181
zookeeper.connection.timeout.ms=18000

group.initial.rebalance.delay.ms=0 
max.connection.per.ip=100 
max.connections=100
listener.name.internal.max.connections=100
request.timeout.ms=180000
connections.max.idle.ms=300000

我也在使用AdminAPI来创建主题。它也是从main方法调用的。以下是使用Admin API创建主题的代码

 public void createTopics(Collection<NewTopic> newTopics, CreateTopicsOptions createTopicsOptions) {

        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
        AdminClient adminClient = KafkaAdminClient.create(props);

        adminClient.createTopics(newTopics, createTopicsOptions);
    }

我尝试了kafka官网链接中提到的所有配置,用于代理配置和生产者配置,期望进行身份验证,我现在没有使用任何身份验证。


共1个答案

匿名用户

单例是一种反模式,在这里没有任何改进。

您可以在应用程序的生命周期中构建一个< code>KafkaProducer,然后简单地重用它(假设不需要更改它的配置)。

这同样适用于管理客户端;创建一个,然后将其作为参数传递给需要它的方法/类,而不是每次方法调用都构造一个新的。