我有一个本地kafka正在使用以下docker-compose.yml
version: '2'
services:
zookeeper:
image: "confluentinc/cp-zookeeper:5.0.1"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: "confluentinc/cp-enterprise-kafka:5.0.1"
ports:
- '9092:9092'
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100
尝试在Scala中使用kafka客户端2.1.0运行基本的创建主题:
val props = new Properties()
props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
val adminClient: AdminClient = AdminClient.create(props)
val newTopic = new NewTopic("test", 1, 1.toShort)
val topicsF = adminClient.createTopics(List(newTopic).asJavaCollection)
val result = topicsF.all().get()
但是过了一段时间,我得到了:
org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
我可以使用命令行创建一个主题:
kafka-topics --create \
--zookeeper localhost:2181 \
--replication-factor 1 \
--partitions 1 \
--topic test
Created topic "test".
Kafka AdminClient API超时等待节点分配描述了使用Java的类似问题,但评论表明系统重启修复了问题,而我这边的情况并非如此。
如果您在 Docker(或类似)中运行 Kafka,则需要正确配置侦听器。本文对此进行了详细描述。
这里有一个Docker Compose的示例,您可以使用它从主机访问Kafka。
声明:文章是我写的:)
正如@suh指出的那样。
一种更简单的方法是在第< code >行取消对Kafka < code > server . properties 的注释:listeners = PLAINTEXT://localhost:9092 ,这样应该可以工作。
我认为本地主机
是问题所在。在引导服务器
属性中,使用您在撰写文件中定义的播发主机 (192.168.99.100),而不是应该可以工作的 localhost
。