提问者:小点点

如何在Kafka中使用恒定数量的分区和动态数量的消费者来提高吞吐量


我有 6 个用于某个主题的分区,以及 4 个从该主题消费的消费者。生产者以循环方式生产到分区。这 4 个消费者属于同一消费者组。

通过一些负载测试,我可以看到其中 2 个分区的消耗速度非常慢,而其他分区几乎总是空的。我想尽可能提高我的吞吐量。

    < Li > Kafka的默认分区分配策略是什么? < li >如果负载在某个时候增加,我希望将我的用户扩展到6个(与分区数量相同,因此用户与分区的比例为1:1)。在4个消费者的场景中,为了获得尽可能好的吞吐量,我应该限制我的生产者只生产4个分区,直到我增加了我的消费者的数量吗?

共2个答案

匿名用户

您使用的是哪个Kafka版本?

您的生产者似乎没有使用有效的分区方法。

  • 检查生产者是否生成了类似的密钥?或者如果它正在生成空键?

您可以使用高效的哈希算法编写自定义分区,该算法平均分配消息,并为消费者提供并行消费消息的公平机会

匿名用户

许多因素会影响与KAFKA经纪人相关的客户(生产者/消费者)的整体表现。首先,我不确定你是如何运行你的消费者实例的,无论是4个实例在4个独立的服务器上运行,还是4个实例通过任何IDE工具加载测试本身。你可以在这里更好地澄清。此外,您的消费者实现是什么样子的。它只是阅读主题并将其写入控制台,还是执行连接到任何后端系统的全面业务功能。请确认。

如果存在一个密钥,并且使用了默认的分区器,Kafka将对该密钥进行哈希运算,并使用结果将消息映射到特定的分区。只有当主题中的分区数量不变时,键到分区的映射才是一致的。

您可以更改此行为,实现客户分区程序

除非实现了多线程使用者,否则无法根据吞吐量动态增加使用者。您可以阅读有关Java Executor Service的更多信息,参考:https://dzone.com/articles/kafka-consumer-and-multi-threading。您的使用者实现必须具有如下内容。因此,您应该有一个轮询记录数的计数器,如果它超过您之后的阈值,那么您可以实例化 ExecutorService 以添加更多实例。

私有列表执行器 = 新的数组列表() ;

@Override
public void run(String... args) throws Exception {
    Runtime.getRuntime().addShutdownHook(new Thread() {
        @Override
        public void run() {
            executors.forEach(exe -> {
                exe.shutdown();
                try {
                    if (!exe.awaitTermination(10000, TimeUnit.MILLISECONDS)) {
                        exe.shutdownNow();
                    }
                } catch (InterruptedException e) {
                        exe.shutdownNow();
                        }

                        int instances = <<number of instances>>;
                        ExecutorService executor = Executors.newFixedThreadPool(instances);
                        for (int i=0; i < instances; i++) {
    executor.execute(<<Consumer Implemenation class>>);
    executors.add(executor);
    }
    }