Java NIO 非阻塞服务器

即使你了解了Java NIO非阻塞功能如何工作(Selector,Channel, Buffer等),设计一个无阻塞服务器仍然很难。与阻塞IO相比,非阻塞IO包含多个挑战。这份非阻塞服务器教程将讨论非阻塞服务器的主要挑战,并为它们描述一些潜在的解决方案。

本教程中描述的思想是围绕Java NIO设计的。但是,我相信这些想法只要具有某种类似的Selector构造就可以在其他语言中重用。据我所知,这样的结构是由底层操作系统提供的,因此您很有可能也可以使用其他语言来访问它。

1 非阻塞服务器-GitHub仓库

我已经创建了本教程中提出的想法的简单概念验证,并将其放在GitHub仓库中供您查看。这是GitHub仓库:

https://github.com/jjenkov/java-nio-server

2 无阻塞IO管道

甲非阻塞IO管道是处理非阻塞IO组件的链。这包括以非阻塞方式读取和写入IO。这是简化的无阻塞IO管道的说明:

组件使用选择器检查通道何时 有要读取的数据。然后,组件读取输入数据,并根据输入生成一些输出。输出将Channel再次写入。

无阻塞IO管道不需要读取和写入数据。一些管道可能仅读取数据,而某些管道可能仅写入数据。

上图仅显示了一个组件。无阻塞IO管道可能具有多个组件处理传入数据。无阻塞IO管道的长度取决于管道需要执行的操作。

无阻塞的IO管道也可能同时从多个Channels中读取。例如,从多个SocketChannels读取数据。

上图中的控制流程也得到了简化。它是启动数据从读出部件Channel通过Selector。这不是Channel说将这些数据推入Selector,并从那里到组件,即使是上图显示的内容。

3 非阻塞与阻塞IO管道

非阻塞和阻塞IO管道之间的最大区别是如何从底层Channel(套接字或文件)读取数据 。

IO管道通常从某个流(从套接字或文件)读取数据,并将该数据拆分为一致的消息。这类似于将数据流分解为令牌以使用令牌生成器进行解析。相反,您将数据流分解为更大的消息。我将调用将该流分解为消息的组件,以供 Message Reader使用。这是消息阅读器将流分成消息的说明:

阻塞的IO管道可以使用InputStream-like接口,可以一次从底层读取一个字节Channel,并且InputStream-like接口可以阻塞直到有数据准备读取为止。这导致阻塞的消息阅读器实施。

对流使用阻塞IO接口可以大大简化消息阅读器的实现。阻塞的消息阅读器永远不必处理从流中未读取任何数据,或者仅从流中读取了部分消息并且以后需要恢复消息解析的情况。

同样,阻塞消息编写器(将消息写入流的组件)永远不必处理仅写入消息的一部分且稍后必须恢复消息写入的情况。

3.1 阻止IO管道的缺点

尽管阻塞消息阅读器更易于实现,但是它具有一个不幸的缺点,即需要为每个需要拆分为消息的流分配一个单独的线程。之所以需要这样做,是因为每个流的IO接口都会阻塞,直到有一些数据要从中读取为止。这意味着单个线程无法尝试从一个流中读取,如果没有数据,则从另一个流中读取。一旦线程尝试从流中读取数据,线程就会阻塞,直到实际上有一些数据要读取为止。

如果IO管道是必须处理大量并发连接的服务器的一部分,则该服务器将为每个活动的传入连接需要一个线程。如果服务器在任何时候都只有几百个并发连接,那么这可能不是问题。但是,如果服务器具有数百万个并发连接,则这种设计的扩展性就不太好。每个线程将为其堆栈占用320K(32位JVM)和1024K(64位JVM)内存。因此,1.000.000线程将占用1 TB内存!那就是在服务器使用任何内存来处理传入消息之前(例如,为消息处理期间使用的对象分配的内存)。

为了减少线程数量,许多服务器使用一种设计,其中服务器保留一个线程池(例如100个),该线程池一次从入站连接中读取消息。入站连接保留在队列中,线程按入站连接放入队列的顺序处理来自每个入站连接的消息。此设计在此处说明:

但是,此设计要求入站连接合理地频繁发送数据。如果入站连接可能长时间处于非活动状态,则大量的非活动连接实际上可能会阻塞线程池中的所有线程。这意味着服务器的响应速度变慢甚至不响应。

一些服务器设计试图通过使线程池中的线程数具有一定的弹性来缓解此问题。例如,如果线程池用完了线程,则线程池可能会启动更多线程来处理负载。此解决方案意味着需要大量的慢速连接才能使服务器无响应。但是请记住,可以运行多少个线程仍然存在上限。因此,如果连接速度为1.000.000,连接速度很慢。

4 基本的无阻塞IO管道设计

非阻塞IO管道可以使用单个线程从多个流中读取消息。这要求流可以切换到非阻塞模式。在非阻塞模式下,当您尝试从流中读取数据时,流可能返回0个或多个字节。如果流中没有要读取的数据,则返回0字节。当流实际有一些数据要读取时,将返回1+字节。

为了避免检查要读取的0字节流,我们使用Java NIO选择器。一个或多个SelectableChannel实例可以向注册Selector。当您调用select()或selectNow()时,Selector它只会为您提供SelectableChannel实际具有要读取的数据的 实例。此设计在此处说明:

5 读取部分消息

当我们从读取数据块时,SelectableChannel我们不知道该数据块包含的消息是少于还是多。数据块可能包含部分消息(少于一条消息),一条完整消息或多于一条消息,例如1.5或2.5条消息。此处说明了各种部分消息的可能性:

处理部分消息有两个挑战:

  1. 检测数据块中是否有完整的消息。
  2. 在部分消息到达之前如何处理部分消息。

要检测完整消息,需要消息阅读器查看数据块中的数据,以查看数据是否包含至少一条完整消息。如果数据块包含一个或多个完整消息,则可以将这些消息沿管道发送以进行处理。查找完整消息的过程将重复很多,因此该过程必须尽可能快。

每当数据块中有部分消息时,无论是本身还是在一个或多个完整消息之后,都需要存储该部分消息,直到该消息的其余部分从到达为止Channel。

消息阅读器负责检测全部消息和存储部分消息。为了避免混合来自不同Channel实例的消息数据,我们将每个使用一个消息阅读器Channel。设计看起来像这样:

在检索到Channel具有要从中读取数据的实例之后Selector,与之关联的消息读取器将Channel读取数据,并尝试将其分解为消息。如果这导致读取任何完整的消息,则可以将这些消息沿读取管道向下传递到需要处理它们的任何组件。

消息阅读器当然是特定于协议的。消息阅读器需要知道它试图读取的消息的消息格式。如果我们的服务器实现可跨协议重用,则需要能够插入消息阅读器实现-可能通过某种方式接受消息阅读器工厂作为配置参数。

6 存储部分消息

现在我们已经确定了消息阅读器的职责是存储部分消息,直到收到完整的消息为止,我们需要弄清楚应该如何实现部分消息存储。

我们应考虑两个设计注意事项:

  1. 我们希望尽可能少地复制消息数据。复制越多,性能越低。
  2. 我们希望将完整的消息存储在连续的字节序列中,以使解析消息更加容易。

6.1 每个消息阅读器的缓冲区

显然,部分消息需要存储在某种缓冲区中。直接的实现是在每个消息阅读器内部简单地具有一个缓冲区。但是,该缓冲区应该有多大?它必须足够大以能够存储最大允许的消息。因此,如果允许的最大消息为1MB,则每个消息阅读器中的内部缓冲区至少需要为1MB。

当我们达到数百万个连接时,每个连接使用1MB并不能真正起作用。1.000.000 x 1MB仍是1TB内存!如果最大邮件大小为16MB,该怎么办?还是128MB?

6.2 可调整大小的缓冲区

另一个选择是实现可调整大小的缓冲区,以供在每个Message Reader中使用。可调整大小的缓冲区将从较小的缓冲区开始,如果消息对于该缓冲区而言太大,则缓冲区将被扩展。这样,每个连接将不一定需要例如1MB的缓冲区。每个连接仅占用它们容纳下一条消息所需的内存。

有几种方法可以实现可调整大小的缓冲区。它们都有优点和缺点,因此我将在以下各节中讨论它们。

6.3 按副本调整大小

实现可调整大小的缓冲区的第一种方法是从一个小缓冲区开始,例如4KB。如果消息不能容纳在4KB缓冲区中,则可以分配更大的缓冲区(例如8KB),并将来自4KB缓冲区的数据复制到更大的缓冲区中。

按副本调整大小缓冲区实现的优点是,一条消息的所有数据都保存在一个连续的字节数组中。这使得解析消息更加容易。

“按副本调整大小”缓冲区实现的缺点是,它将导致针对较大消息复制大量数据。

为了减少数据复制,您可以分析流经系统的消息的大小,以找到可以减少复制数量的某些缓冲区大小。例如,您可能会看到大多数消息都小于4KB,因为它们仅包含很小的请求/响应。这意味着第一个缓冲区大小应为4KB。

然后,您可能会看到,如果一条消息大于4KB,通常是因为它包含一个文件。然后,您可能会注意到,流经系统的大多数文件都小于128KB。然后将第二个缓冲区的大小设置为128KB是有意义的。

最终,您可能会看到,一旦消息超过128KB,就没有真正的消息大小模式,因此,最终缓冲区大小可能只是最大消息大小。

使用这3种缓冲区大小(基于流经系统的消息大小),可以减少数据复制。4KB以下的消息将永远不会被复制。对于1.000.000并发连接,导致1.000.000 x 4KB = 4GB,这在今天(2015年)的大多数服务器中都是可能的。4KB和128KB之间的消息将被复制一次,并且仅4KB数据将需要复制到128KB缓冲区中。128KB和最大消息大小之间的消息将被复制两次。第一次将复制4KB,第二次将复制128KB,因此对于最大消息,总共将复制132KB。假设没有太多消息超过128KB,这是可以接受的。

一旦消息已被完全处理,分配的内存应再次释放。这样,从同一连接接收到的下一条消息将再次以最小的缓冲区大小开始。必须确保在连接之间可以更有效地共享内存。很有可能并非所有连接都同时需要大缓冲区。

我这里有一个完整的教程,介绍如何实现支持可调整大小的数组的内存缓冲区: Resizable Arrays。本教程还包含指向GitHub存储库的链接,其中包含显示有效实施的代码。

6.4 通过追加调整大小

调整缓冲区大小的另一种方法是使缓冲区包含多个数组。当您需要调整缓冲区的大小时,您只需分配另一个字节数组并将数据写入其中。

有两种方法可以增加这种缓冲区。一种方法是分配单独的字节数组,并保留这些字节数组的列表。另一种方法是分配更大的共享字节数组的切片,然后保留分配给缓冲区的切片的列表。就个人而言,我认为切片方法稍好一些,但差异很小。

通过将单独的数组或切片附加到缓冲区中来增加缓冲区的优点是,在写入过程中无需复制任何数据。可以将所有数据直接从套接字(Channel)直接复制到数组或切片中。

以这种方式增长缓冲区的缺点是数据不会存储在单个连续的数组中。这使消息解析变得更加困难,因为解析器需要同时寻找每个单个数组的结尾和所有数组的结尾。由于您需要在书面数据中查找消息的结尾,因此使用此模型不太容易。

6.5 TLV编码消息

某些协议消息格式使用TLV格式(类型,长度,值)进行编码。这意味着,当消息到达时,消息的总长度将存储在消息的开头。这样,您立即知道要为整个消息分配多少内存。

TLV编码使内存管理更加容易。您立即知道要为该消息分配多少内存。在仅部分使用的缓冲区的末尾不会浪费任何内存。

TLV编码的一个缺点是,您必须在消息的所有数据到达之前为消息分配所有内存。因此,一些发送大消息的慢速连接可以分配所有可用的内存,从而使服务器无响应。

解决此问题的方法是使用一种消息格式,其中包含多个TLV字段。因此,为每个字段分配内存,而不是为整个消息分配内存,并且仅在字段到达时才分配内存。尽管如此,大字段对大容量消息的影响仍与大消息相同。

另一个解决方法是使在10到15秒内未收到的消息超时。这可以使您的服务器从同时出现的许多大消息中恢复过来,但仍会使服务器在一段时间内无响应。此外,故意的DoS(拒绝服务)攻击仍可能导致为服务器完全分配内存。

TLV编码存在不同的变体。确切地使用了多少字节,因此指定字段的类型和长度取决于每个单独的TLV编码。也有TLV编码将字段的长度放在首位,然后是类型,然后是值(LTV编码)。尽管字段的顺序不同,但这仍然是TLV的变体。

TLV编码使内存管理更容易的事实是HTTP 1.1如此糟糕的协议的原因之一。这是他们试图在HTTP 2.0中解决的问题之一,在HTTP 2.0中,数据以LTV编码的帧进行传输。这也是为什么我们为 使用TLV编码的VStack.co项目设计了自己的网络协议的原因。

7 编写部分消息

在无阻塞的IO管道中,写入数据也是一个挑战。当您 以非阻塞模式调用write(ByteBuffer)a时Channel,无法保证ByteBuffer其中写入了多少字节。该write(ByteBuffer)方法返回已写入的字节数,因此可以跟踪已写入的字节数。这就是挑战:跟踪部分写入的消息,以便最后发送一条消息的所有字节。

要管理将部分消息写入A,Channel我们将创建一个消息编写器。就像使用消息阅读器一样,每次Channel向其写入消息时,我们都需要一个消息编写器。在每个Message Writer内,我们都跟踪当前正在写入的消息的确切字节数。

如果到达消息编写器的消息多于直接写入Channel的消息,则需要在消息编写器内部将消息排队。然后,消息编写器将消息尽快写入Channel。

这是到目前为止到目前为止如何设计部分消息的示意图:

为了使消息编写器能够发送仅部分发送的较早消息,需要不时调用消息编写器,以便它可以发送更多数据。

如果您有很多连接,则将有很多Message Writer实例。检查例如一百万个Message Writer实例以查看它们是否可以写入任何数据很慢。首先,许多Message Writer实例很多没有任何要发送的消息。我们不想检查那些Message Writer实例。其次,并非所有Channel 实例都准备好向其中写入数据。我们不想浪费时间尝试将数据写入Channel 仍然无法接受任何数据的。

要检查是否Channel准备好写入,您可以使用来注册频道Selector。但是,我们不希望在中注册所有Channel实例Selector。想象一下,如果您有1.000.000个连接,其中大多数都是空闲的,并且所有1.000.000个连接都已在上注册Selector。然后,当您调用select()这些Channel 实例中的大多数时,它们将准备好进行写操作(它们大多处于空闲状态,还记得吗?)。然后,您必须检查所有这些连接的消息编写器,以查看它们是否有任何数据要写入。

为了避免检查所有Message Writer实例中的消息,以及始终Channel没有任何消息要发送给它们的所有实例,我们使用以下两步方法:

  1. 当一个消息被写入到一个消息编写器,消息编写器相关联的注册其Channel 与Selector(如果它尚未注册)。
     
  2. 当您的服务器有时间时,它将检查,Selector以查看哪些已注册Channel 实例可以进行写操作。对于每个准备就绪Channel的写入,都要求其关联的消息编写器将数据写入Channel。如果消息编写器将其所有消息写入 Channel,Channel则会从Selector再次注销。

这种分两步走的小方法可确保仅Channel将具有要写入消息的实例实际注册到Selector。

8 总结

如您所见,非阻塞服务器需要不时检查传入数据,以查看是否接收到任何新的完整消息。服务器可能需要多次检查,直到收到一个或多个完整的消息。仅检查一次是不够的。

同样,非阻塞服务器需要不时检查是否有任何数据要写入。如果是,则服务器需要检查是否有任何相应的连接已准备好将数据写入其中。仅在消息第一次排队时进行检查是不够的,因为消息可能会被部分写入。

总而言之,一个非阻塞服务器最终需要定期执行以下三个“管道”:

  • 读取管道,用于检查来自打开的连接的新传入数据。
  • 处理接收到的所有完整消息的处理管道。
  • 用于检查它是否可以将任何传出消息写入任何打开的连接的写管道。

这三个管道在一个循环中重复执行。您也许可以在某种程度上优化执行。例如,如果没有消息排队,则可以跳过写管道。或者,如果没有收到新的完整消息,则可以跳过流程管道。

这是说明整个服务器循环的图:

如果您仍然觉得这有点复杂,请记住查看GitHub仓库:

https://github.com/jjenkov/java-nio-server

也许看到运行中的代码可能有助于您了解如何实现此目的。

9 服务器线程模型

GitHub存储库中的非阻塞服务器实现使用具有2个线程的线程模型。第一个线程接受来自的传入连接ServerSocketChannel。第二个线程处理接受的连接,这意味着读取消息,处理消息并将响应写回到连接。这两个线程模型如下所示:

热门文章

优秀文章