提问者:小点点

高效地从压缩、分块HTTP流中读取行


我编写了一个HTTP服务器,它生成由JSON结构事件组成的无尽HTTP流。类似于Twitter的流媒体API。这些事件由\n分隔(根据服务器发送的Content-Type事件:text/event-stream),并且长度可以变化。

回应是

  • chunked(HTTP1.1传输编码:chunked)由于无尽的流
  • 压缩(Content-Encode: gzip)以节省带宽。

我想尽快在Python消耗这些生产线,并尽可能节约资源,而不是重新发明轮子。

由于我目前正在使用python-request,你知道如何让它工作吗?如果你认为python-request在这里无济于事,我对替代框架/库完全开放。

我目前的实现是基于请求,并使用iter_lines(…)来接收行。但是chunk_size参数很棘手。如果设置为1,它非常cpu密集,因为有些事件可能是几千字节。如果设置为高于1的任何值,一些事件会卡住,直到下一个到达,整个缓冲区“被填满”。事件之间的时间可能是几秒钟。我期望chunk_size是某种“接收的最大字节数”,就像unix的recv(…)一样。相应的手册页说:

接收呼叫通常返回任何可用数据,最多不超过请求的数量,而不是等待收到请求的全部数量。

但这显然不是请求库中的工作方式。他们或多或少地将其用作“要接收的确切字节数”。在查看他们的源代码时,我无法确定哪个部分对此负责。也许是httplib的Response或ssl的SSLSocket。

作为一种解决方法,我尝试将服务器上的行填充为chunk大小的倍数。但是请求库中的chunk大小用于从压缩的响应流中获取字节。因此,除非我可以填充我的行,使其压缩的字节序列是chunk大小的倍数,否则这将无法工作。但这似乎太hacky了。

我读到Twisted可用于客户端上超文本传输协议流的非阻塞、非缓冲处理,但我只在服务器上找到了创建流响应的代码。


共2个答案

匿名用户

您的iter_lines()调用被阻塞不是请求的错误。

响应。iter_lines()方法调用响应。iter_content(),它调用urllib3HTTPResponse.stream(),它调用HTTPResponse.read()

这些调用传递一个chunk size,这是作为self传递给套接字的。_fp.read(amt)。这是有问题的调用,因为self。_fp是由socket. makefile()生成的文件对象(由httplib模块完成);这个.read()调用将阻塞,直到读取amt(压缩)字节。

这个低级套接字文件对象确实支持. readline()调用,这将更有效地工作,但是urllib3在处理压缩数据时不能使用此调用;行终止符在压缩流中不可见。

不幸的是,urllib3不会调用self。_fp. readline()当响应没有被压缩时;调用的结构很难传递,你想在行缓冲模式下读取,而不是在块缓冲模式下读取。

我必须说HTTP不是用于流式事件的最佳协议;我会为此使用不同的协议。想到Websocket,或者您特定用例的自定义协议。

匿名用户

多亏了Martijn Pieters的回答,我不再处理python请求行为,而是寻找一种完全不同的方法。

我最终使用了pyCurl。您可以将其类似于select recv循环使用,而无需像Tornado等中那样反转控制流并将控制权交给专用的IO循环。这样就可以轻松使用生成器,在新行到达时立即生成新行-而无需在中间层中进一步缓冲,从而可能引入延迟或运行IO循环的额外线程。

同时,它足够高级,您无需担心分块传输编码、SSL加密或gzip压缩。

这是我的旧代码,其中chunk_size=1导致45%CPU负载和chunk_size

import requests
class RequestsHTTPStream(object):
    def __init__(self, url):
        self.url = url

    def iter_lines(self):
        headers = {'Cache-Control':'no-cache',
                   'Accept': 'text/event-stream',
                   'Accept-Encoding': 'gzip'}
        response = requests.get(self.url, stream=True, headers=headers)
        return response.iter_lines(chunk_size=1)

这是我基于pyCurl的新代码:(不幸的是,curl_easy_*style完全执行块,这使得在不使用线程的情况下很难在两者之间产生行。因此我使用curl_multi_*方法)

import pycurl
import urllib2
import httplib
import StringIO

class CurlHTTPStream(object):
    def __init__(self, url):
        self.url = url
        self.received_buffer = StringIO.StringIO()

        self.curl = pycurl.Curl()
        self.curl.setopt(pycurl.URL, url)
        self.curl.setopt(pycurl.HTTPHEADER, ['Cache-Control: no-cache', 'Accept: text/event-stream'])
        self.curl.setopt(pycurl.ENCODING, 'gzip')
        self.curl.setopt(pycurl.CONNECTTIMEOUT, 5)
        self.curl.setopt(pycurl.WRITEFUNCTION, self.received_buffer.write)

        self.curlmulti = pycurl.CurlMulti()
        self.curlmulti.add_handle(self.curl)

        self.status_code = 0

    SELECT_TIMEOUT = 10

    def _any_data_received(self):
        return self.received_buffer.tell() != 0

    def _get_received_data(self):
        result = self.received_buffer.getvalue()
        self.received_buffer.truncate(0)
        self.received_buffer.seek(0)
        return result

    def _check_status_code(self):
        if self.status_code == 0:
            self.status_code = self.curl.getinfo(pycurl.HTTP_CODE)
        if self.status_code != 0 and self.status_code != httplib.OK:
            raise urllib2.HTTPError(self.url, self.status_code, None, None, None)

    def _perform_on_curl(self):
        while True:
            ret, num_handles = self.curlmulti.perform()
            if ret != pycurl.E_CALL_MULTI_PERFORM:
                break
        return num_handles

    def _iter_chunks(self):
        while True:
            remaining = self._perform_on_curl()
            if self._any_data_received():
                self._check_status_code()
                yield self._get_received_data()
            if remaining == 0:
                break
            self.curlmulti.select(self.SELECT_TIMEOUT)

        self._check_status_code()
        self._check_curl_errors()

    def _check_curl_errors(self):
        for f in self.curlmulti.info_read()[2]:
            raise pycurl.error(*f[1:])

    def iter_lines(self):
        chunks = self._iter_chunks()
        return self._split_lines_from_chunks(chunks)

    @staticmethod
    def _split_lines_from_chunks(chunks):
        #same behaviour as requests' Response.iter_lines(...)

        pending = None
        for chunk in chunks:

            if pending is not None:
                chunk = pending + chunk
            lines = chunk.splitlines()

            if lines and lines[-1] and chunk and lines[-1][-1] == chunk[-1]:
                pending = lines.pop()
            else:
                pending = None

            for line in lines:
                yield line

        if pending is not None:
            yield pending

此代码尝试从传入流中获取尽可能多的字节,如果只有几个字节,则不会不必要地阻塞。相比之下,CPU负载约为0.2%