我有 6 个用于某个主题的分区,以及 4 个从该主题消费的消费者。生产者以循环方式生产到分区。这 4 个消费者属于同一消费者组。
通过一些负载测试,我可以看到其中 2 个分区的消耗速度非常慢,而其他分区几乎总是空的。我想尽可能提高我的吞吐量。
您使用的是哪个Kafka版本?
您的生产者似乎没有使用有效的分区方法。
您可以使用高效的哈希算法编写自定义分区,该算法平均分配消息,并为消费者提供并行消费消息的公平机会
许多因素会影响与KAFKA经纪人相关的客户(生产者/消费者)的整体表现。首先,我不确定你是如何运行你的消费者实例的,无论是4个实例在4个独立的服务器上运行,还是4个实例通过任何IDE工具加载测试本身。你可以在这里更好地澄清。此外,您的消费者实现是什么样子的。它只是阅读主题并将其写入控制台,还是执行连接到任何后端系统的全面业务功能。请确认。
如果存在一个密钥,并且使用了默认的分区器,Kafka将对该密钥进行哈希运算,并使用结果将消息映射到特定的分区。只有当主题中的分区数量不变时,键到分区的映射才是一致的。
您可以更改此行为,实现客户分区程序
除非实现了多线程使用者,否则无法根据吞吐量动态增加使用者。您可以阅读有关Java Executor Service的更多信息,参考:https://dzone.com/articles/kafka-consumer-and-multi-threading。您的使用者实现必须具有如下内容。因此,您应该有一个轮询记录数的计数器,如果它超过您之后的阈值,那么您可以实例化 ExecutorService 以添加更多实例。
私有列表执行器 = 新的数组列表() ;
@Override
public void run(String... args) throws Exception {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
executors.forEach(exe -> {
exe.shutdown();
try {
if (!exe.awaitTermination(10000, TimeUnit.MILLISECONDS)) {
exe.shutdownNow();
}
} catch (InterruptedException e) {
exe.shutdownNow();
}
int instances = <<number of instances>>;
ExecutorService executor = Executors.newFixedThreadPool(instances);
for (int i=0; i < instances; i++) {
executor.execute(<<Consumer Implemenation class>>);
executors.add(executor);
}
}