我编写了一个HTTP服务器,它生成由JSON结构事件组成的无尽HTTP流。类似于Twitter的流媒体API。这些事件由\n
分隔(根据服务器发送的Content-Type事件:text/event-stream),并且长度可以变化。
回应是
我想尽快在Python消耗这些生产线,并尽可能节约资源,而不是重新发明轮子。
由于我目前正在使用python-request,你知道如何让它工作吗?如果你认为python-request在这里无济于事,我对替代框架/库完全开放。
我目前的实现是基于请求,并使用iter_lines(…)
来接收行。但是chunk_size
参数很棘手。如果设置为1
,它非常cpu密集,因为有些事件可能是几千字节。如果设置为高于1的任何值,一些事件会卡住,直到下一个到达,整个缓冲区“被填满”。事件之间的时间可能是几秒钟。我期望chunk_size
是某种“接收的最大字节数”,就像unix的recv(…)
一样。相应的手册页说:
接收呼叫通常返回任何可用数据,最多不超过请求的数量,而不是等待收到请求的全部数量。
但这显然不是请求库中的工作方式。他们或多或少地将其用作“要接收的确切字节数”。在查看他们的源代码时,我无法确定哪个部分对此负责。也许是httplib的Response或ssl的SSLSocket。
作为一种解决方法,我尝试将服务器上的行填充为chunk大小的倍数。但是请求库中的chunk大小用于从压缩的响应流中获取字节。因此,除非我可以填充我的行,使其压缩的字节序列是chunk大小的倍数,否则这将无法工作。但这似乎太hacky了。
我读到Twisted可用于客户端上超文本传输协议流的非阻塞、非缓冲处理,但我只在服务器上找到了创建流响应的代码。
您的iter_lines()
调用被阻塞不是请求
的错误。
响应。iter_lines()
方法调用响应。iter_content()
,它调用urllib3
的HTTPResponse.stream()
,它调用HTTPResponse.read()
。
这些调用传递一个chunk size,这是作为self传递给套接字的。_fp.read(amt)
。这是有问题的调用,因为self。_fp
是由socket. makefile()
生成的文件对象(由httplib
模块完成);这个.read()
调用将阻塞,直到读取amt
(压缩)字节。
这个低级套接字文件对象确实支持. readline()
调用,这将更有效地工作,但是urllib3
在处理压缩数据时不能使用此调用;行终止符在压缩流中不可见。
不幸的是,urllib3
不会调用self。_fp. readline()
当响应没有被压缩时;调用的结构很难传递,你想在行缓冲模式下读取,而不是在块缓冲模式下读取。
我必须说HTTP不是用于流式事件的最佳协议;我会为此使用不同的协议。想到Websocket,或者您特定用例的自定义协议。
多亏了Martijn Pieters的回答,我不再处理python请求行为,而是寻找一种完全不同的方法。
我最终使用了pyCurl。您可以将其类似于select recv循环使用,而无需像Tornado等中那样反转控制流并将控制权交给专用的IO循环。这样就可以轻松使用生成器,在新行到达时立即生成新行-而无需在中间层中进一步缓冲,从而可能引入延迟或运行IO循环的额外线程。
同时,它足够高级,您无需担心分块传输编码、SSL加密或gzip压缩。
这是我的旧代码,其中chunk_size
=1导致45%CPU负载和chunk_size
import requests
class RequestsHTTPStream(object):
def __init__(self, url):
self.url = url
def iter_lines(self):
headers = {'Cache-Control':'no-cache',
'Accept': 'text/event-stream',
'Accept-Encoding': 'gzip'}
response = requests.get(self.url, stream=True, headers=headers)
return response.iter_lines(chunk_size=1)
这是我基于pyCurl的新代码:(不幸的是,curl_easy_*style完全执行
块,这使得在不使用线程的情况下很难在两者之间产生行。因此我使用curl_multi_*方法)
import pycurl
import urllib2
import httplib
import StringIO
class CurlHTTPStream(object):
def __init__(self, url):
self.url = url
self.received_buffer = StringIO.StringIO()
self.curl = pycurl.Curl()
self.curl.setopt(pycurl.URL, url)
self.curl.setopt(pycurl.HTTPHEADER, ['Cache-Control: no-cache', 'Accept: text/event-stream'])
self.curl.setopt(pycurl.ENCODING, 'gzip')
self.curl.setopt(pycurl.CONNECTTIMEOUT, 5)
self.curl.setopt(pycurl.WRITEFUNCTION, self.received_buffer.write)
self.curlmulti = pycurl.CurlMulti()
self.curlmulti.add_handle(self.curl)
self.status_code = 0
SELECT_TIMEOUT = 10
def _any_data_received(self):
return self.received_buffer.tell() != 0
def _get_received_data(self):
result = self.received_buffer.getvalue()
self.received_buffer.truncate(0)
self.received_buffer.seek(0)
return result
def _check_status_code(self):
if self.status_code == 0:
self.status_code = self.curl.getinfo(pycurl.HTTP_CODE)
if self.status_code != 0 and self.status_code != httplib.OK:
raise urllib2.HTTPError(self.url, self.status_code, None, None, None)
def _perform_on_curl(self):
while True:
ret, num_handles = self.curlmulti.perform()
if ret != pycurl.E_CALL_MULTI_PERFORM:
break
return num_handles
def _iter_chunks(self):
while True:
remaining = self._perform_on_curl()
if self._any_data_received():
self._check_status_code()
yield self._get_received_data()
if remaining == 0:
break
self.curlmulti.select(self.SELECT_TIMEOUT)
self._check_status_code()
self._check_curl_errors()
def _check_curl_errors(self):
for f in self.curlmulti.info_read()[2]:
raise pycurl.error(*f[1:])
def iter_lines(self):
chunks = self._iter_chunks()
return self._split_lines_from_chunks(chunks)
@staticmethod
def _split_lines_from_chunks(chunks):
#same behaviour as requests' Response.iter_lines(...)
pending = None
for chunk in chunks:
if pending is not None:
chunk = pending + chunk
lines = chunk.splitlines()
if lines and lines[-1] and chunk and lines[-1][-1] == chunk[-1]:
pending = lines.pop()
else:
pending = None
for line in lines:
yield line
if pending is not None:
yield pending
此代码尝试从传入流中获取尽可能多的字节,如果只有几个字节,则不会不必要地阻塞。相比之下,CPU负载约为0.2%