提问者:小点点

使用Kafka在应用程序上打开太多文件时出错


我正在使用Kafka和Spark Streaming构建一个应用程序。输入数据来自第三部分流媒体,并在kafka主题上发布。这段代码显示了流代理模块:这是我从流媒体中获取结果以及如何将它们发送到KafkaPublisher的方式(它只显示了一个草图):

def on_result_response(self,*args):
    self.kafkaPublisher.pushMessage(str(args[0]))

KafkaPublisher是通过以下两种方法实现的:

class KafkaPublisher:

def __init__(self,address,port,topic):
    self.kafka = KafkaClient(str(address)+":"+str(port))
    self.producer = SimpleProducer(self.kafka)
    self.topic=topic



def pushMessage(self,message):
    self.producer.send_messages(self.topic, message)
    self.producer = SimpleProducer(self.kafka, async=True)

该应用程序是由这个主启动的:

from StreamProxy import StreamProxy


streamProxy=StreamProxy("localhost",9092,"task1")
streamProxy.getStreaming(20)  #seconds of streaming

在一些批次处理作业(10秒左右)之后,它会启动以下异常:

线程Thread-2354中的异常:Traceback(最近一次调用是最后一次):File"/usr/lib/python2.7/threading.py",第801行,在__bootstrap_innerFile"/usr/lib/python2.7/threading.py",第754行,在run File"/usr/local/lib/python2.7/dist包/kafka/生产者/base.py",第164行,在_send_upstreamFile"/usr/local/lib/python2.7/dist包/kafka/client.py",第649行,在send_produce_requestFile"/usr/local/lib/python2.7/dist包/kafka/client.py",第253行,在_send_broker_aware_requestFile"/usr/local/lib/python2.7/dist包/kafka/client.py",第74行,在_get_connFile"/usr/local/lib/python2.7/dist包/kafka/conn.py",第236行,在连接错误:[Errno 24]太多打开

线程Thread-2355中的异常:Traceback(最近一次调用是最后一次):File"/usr/lib/python2.7/threading.py",第801行,在__bootstrap_innerFile"/usr/lib/python2.7/threading.py",第754行,在run File"/usr/local/lib/python2.7/dist包/kafka/生产者/base.py",第164行,在_send_upstreamFile"/usr/local/lib/python2.7/dist包/kafka/client.py",第649行,在send_produce_requestFile"/usr/local/lib/python2.7/dist包/kafka/client.py",第253行,在_send_broker_aware_requestFile"/usr/local/lib/python2.7/dist包/kafka/client.py",第74行,在_get_connFile"/usr/local/lib/python2.7/dist包/kafka/conn.py",第236行,在连接错误:[Errno 24]太多打开

请注意,同一消息有许多不同的异常,问题肯定是发布者方面的。


共1个答案

匿名用户

尝试删除该行:

self.producer = SimpleProducer(self.kafka, async=True)