流
什么是流
RabbitMQ 流是一个持久化的复制数据结构,可以完成与队列相同的功能:它们缓冲来自生产者的消息,这些消息会被消费者读取。但是,流在两个重要方面与队列不同:消息的存储和消费方式。
流模拟消息的追加式日志,可以重复读取直到过期。流始终是持久化和复制的。对这种流行为更技术的描述是“非破坏性消费者语义”。
要从 RabbitMQ 中的流读取消息,一个或多个消费者订阅它并根据需要多次读取相同的消息。
可以通过 RabbitMQ 客户端库或通过 专用的二进制协议 插件和关联的客户端来使用流中的数据。后一种选择**强烈推荐**,因为它提供了对所有流特定功能的访问,并提供了最佳的吞吐量(性能)。
现在,您可能正在问以下问题
为了回答这些问题,流的引入并非为了取代队列,而是为了补充它们。流为 RabbitMQ 的新用例打开了诸多可能性,这些用例在 流的使用场景 中进行了描述。
以下信息详细介绍了流的使用以及流的管理和维护操作。
您还应该查看 流插件 信息,以了解有关使用二进制 RabbitMQ 流协议使用流的更多信息,以及 流核心和流插件比较页面 以获取功能矩阵。
流的使用场景
流最初是为了涵盖 4 个现有队列类型无法提供或存在缺点的消息传递用例而开发的
大规模扇出
当希望将相同的消息传递给多个订阅者时,用户当前必须为每个消费者绑定一个专用的队列。如果消费者的数量很大,这可能会变得效率低下,尤其是在需要持久性和/或复制时。流将允许任意数量的消费者以非破坏性的方式从同一个队列消费相同的消息,从而无需绑定多个队列。流消费者还可以从副本读取,从而将读取负载分散到集群中。
回放(时间旅行)
由于所有当前的 RabbitMQ 队列类型都具有破坏性消费行为,即当消费者完成消息处理后会将其从队列中删除,因此无法重新读取已消费的消息。流将允许消费者在日志中的任何点附加并从该点开始读取。
吞吐量性能
没有持久性队列类型能够提供可以与任何现有基于日志的消息传递系统相媲美的吞吐量。流的设计以性能为主要目标。
大多数 RabbitMQ 队列旨在收敛到空状态并为此进行了优化,并且当给定队列上有数百万条消息时性能会下降。流旨在以有效的方式存储大量数据,并最大限度地减少内存开销。
如何使用 RabbitMQ 流
可以指定 可选队列和消费者参数 的 AMQP 0.9.1 客户端库将能够像使用常规 AMQP 0.9.1 队列一样使用流。
就像队列一样,流也必须首先声明。
声明 RabbitMQ 流
要声明一个流,请将
x-queue-type
队列参数设置为
stream
(默认值为
classic
)。客户端在声明时必须提供此参数;不能使用
策略
来设置或更改它。这是因为策略定义或适用的策略可以动态更改,但队列类型不能。它必须在声明时指定。
以下代码段显示了如何使用 AMQP 0.9.1 Java 客户端 创建流
ConnectionFactory factory = new ConnectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(
"my-stream",
true, // durable
false, false, // not exclusive, not auto-delete
Collections.singletonMap("x-queue-type", "stream")
);
声明一个
x-queue-type
参数设置为
stream
的队列将在每个配置的 RabbitMQ 节点上创建一个具有副本的流。流是仲裁系统,因此强烈建议集群大小不均匀。
流仍然是 AMQP 0.9.1 队列,因此它可以在创建后绑定到任何交换机,就像任何其他 RabbitMQ 队列一样。
如果使用
管理 UI
声明,则必须使用队列类型下拉菜单指定
stream
类型。
x-max-length-bytes
设置流的最大大小(以字节为单位)。请参阅 保留 。默认值:未设置。
x-max-age
设置流的最大生存期。请参阅 保留 。默认值:未设置。
x-stream-max-segment-size-bytes
单位:字节。
流在磁盘上被划分为固定大小的段文件。此设置控制这些文件的大小。默认值:(500000000 字节)。
虽然可以通过策略配置此参数,但只有在声明流时设置(存在)策略时,它才会 应用 于流。如果对于匹配但预先存在的流更改了此参数,即使队列记录的有效策略可能表明它已更改,它也**不会更改**。
因此,最好只通过一个可选队列参数
x-stream-filter-size-bytes
进行配置。
虽然可以通过策略配置
x-stream-filter-size-bytes
,但只有在声明流时策略存在时,它才会
应用
于流
以下 Java 示例演示了如何在应用程序代码中在声明流时设置参数
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-queue-type", "stream");
arguments.put("x-max-length-bytes", 20_000_000_000); // maximum stream size: 20 GB
arguments.put("x-stream-max-segment-size-bytes", 100_000_000); // size of segment files: 100 MB
channel.queueDeclare(
"my-stream",
true, // durable
false, false, // not exclusive, not auto-delete
arguments
);
该值以字节为单位设置。
用于 过滤 的布隆过滤器的尺寸。该值必须在 16 到 255 之间。默认值:16。
客户端操作
由于流永远不会删除任何消息,因此任何消费者都可以从日志中的任何点开始读取/消费。这由
x-stream-offset
消费者参数控制。如果未指定,则消费者将从消费者启动后写入日志的下一个偏移量开始读取。支持以下值
first
- 从日志中第一个可用消息开始
last
- 这将从最后写入的消息“块”开始读取
(块是流中使用的存储和传输单元,简单来说,它是由几千条到几千条消息组成的一批消息,具体取决于入口)
next
- 与未指定任何偏移量相同
x-max-age
相同的规范(请参阅
保留
)
以下代码段显示了如何使用
first
偏移量规范
channel.basicQos(
100); // QoS must be specified
channel.basicConsume(
"my-stream",
false,
Collections.singletonMap("x-stream-offset", "first"), // "first" offset specification
(consumerTag, message) -> {
// message processing
// ...
channel.basicAck(message.getEnvelope().getDeliveryTag(), false); // ack is required
},
consumerTag -> { });
以下代码段显示了如何指定要从中消费的特定偏移量
channel.basicQos(100); // QoS must be specified
channel.basicConsume(
"my-stream",
false,
Collections.singletonMap("x-stream-offset", 5000), // offset value
(consumerTag, message) -> {
// message processing
// ...
channel.basicAck(message.getEnvelope().getDeliveryTag(), false); // ack is required
},
consumerTag -> { });
以下代码段显示了如何指定要从中消费的特定时间戳
// an hour ago
Date timestamp = new Date(System.currentTimeMillis() - 60 * 60 * 1_000)
channel.basicQos(100); // QoS must be specified
channel.basicConsume(
"my-stream",
false,
Collections.singletonMap("x-stream-offset", timestamp), // timestamp offset
(consumerTag, message) -> {
// message processing
// ...
channel.basicAck(message.getEnvelope().getDeliveryTag(), false)
; // ack is required
},
consumerTag -> { });
其他流操作
以下操作可以像使用经典队列和仲裁队列一样使用,但有些操作具有特定于队列的行为。
流的单一活动消费者功能
流的单一活动消费者是 RabbitMQ 3.11 及更高版本中提供的一项功能。它为流提供了 独占消费 和 消费连续性 。当多个共享相同流和名称的消费者实例启用单一活动消费者时,一次只有一个实例处于活动状态,因此将接收消息。其他实例将处于空闲状态。
单一活动消费者功能提供了 2 个好处
一篇 博文 提供了有关流的单活动消费者的更多详细信息。
超级流
超级流是一种通过将大型流划分为较小流来扩展的方法。它们与 单活动消费者 集成以保留分区内的消息顺序。超级流从 RabbitMQ 3.11 开始可用。
超级流是由单个普通流组成的逻辑流。它是使用 RabbitMQ 流扩展发布和消费的一种方式:一个大型逻辑流被划分为分区流,将存储和流量分散到多个集群节点上。
超级流仍然是一个逻辑实体:由于客户端库的智能性,应用程序将其视为一个“大型”流。超级流的拓扑结构基于 AMQP 0.9.1 模型 ,即交换机、队列以及它们之间的绑定。
可以使用任何 AMQP 0.9.1 库或
管理插件
创建超级流的拓扑结构,这需要创建一个直接交换机、“分区”流,并将它们绑定在一起。不过,使用
rabbitmq-streams add_super_stream
命令可能会更容易。以下是如何使用它来创建一个包含 3 个分区的
invoices
超级流
rabbitmq-streams add_super_stream invoices --partitions 3
使用
rabbitmq-streams add_super_stream --help
了解有关该命令的更多信息。
与单个流相比,超级流增加了复杂性,因此不应将其视为所有涉及流的用例的默认解决方案。仅当您确定已达到单个流的限制时,才考虑使用超级流。
一篇 博文 提供了超级流的概述。
RabbitMQ Stream 提供了一个服务器端过滤功能,该功能避免读取流的所有消息,而只在客户端进行过滤。当消费应用程序只需要消息的一个子集(例如,来自特定地理区域的消息)时,这有助于节省网络带宽。
流过滤受 流协议 、AMQP 0.9.1 和 STOMP 支持。示例将使用 AMQP 0.9.1。
为了使过滤功能正常工作,必须发布具有关联过滤器值的消息。此值使用
x-stream-filter-value
标头指定
channel.basicPublish(
"", // default exchange
"my-stream",
new AMQP.BasicProperties.Builder()
.headers(Collections.singletonMap(
"x-stream-filter-value", "california" // set filter value
))
.build(),
body
);
如果消费者只想接收给定过滤器值的消息,则必须使用
x-stream-filter
参数
channel.basicQos(100); // QoS must be specified
channel.basicConsume(
"my-stream",
false,
Collections.singletonMap("x-stream-filter", "california"), // set filter
(consumerTag, message) -> {
Map<String, Object> headers = message.getProperties().getHeaders();
// there must be some client-side filter logic
if ("california".equals(headers.get("x-stream-filter-value"))) {
// message processing
// ...
}
channel.basicAck(message.getEnvelope().getDeliveryTag(), false); // ack is required
},
consumerTag -> { });
如上所示,也必须有一些客户端过滤逻辑,因为服务器端过滤是 概率性的 :不匹配过滤器值的消息仍然可以发送到消费者。服务器使用 布隆过滤器 ,这是一种节省空间的概率数据结构,其中可能存在误报。尽管如此,过滤仍然节省了一些带宽,这是它的主要目标。
有关过滤的其他说明
x-stream-match-unfiltered
参数设置为
true
以更改此行为并接收
未过滤的
消息。
x-stream-filter
消费者参数接受字符串,也接受字符串数组以接收不同过滤器值的消息。
一篇 第一篇博文 提供了流过滤的概述,而 第二篇博文 涵盖了内部细节。
功能比较:普通队列与流
流并非传统意义上的队列,因此与 AMQP 0.9.1 队列语义的契合度不高。许多其他队列类型支持的功能不受支持,并且由于队列类型的性质,永远不会被支持。
只要使用消费者确认,能够使用 普通队列 的 AMQP 0.9.1 客户端库就能使用流。
由于流的非破坏性读取语义,许多功能永远不会被支持。
功能矩阵
功能 | 经典 | 流 |
---|---|---|
非持久队列 | 是 | 否 |
排他性 | 是 | 否 |
每条消息的持久性 | 每条消息 | 始终 |
成员资格更改 | 自动 | 手动 |
TTL | 是 | 否(但请参见 保留 ) |
队列长度限制 | 是 | 否(但请参见 保留 ) |
将消息保存在内存中 | 请参见 经典队列 | 从不 |
消息优先级 | 是 | 否 |
消费者优先级 | 是 | 否 |
死信交换机 | 是 | 否 |
符合 策略 | 是 | (请参见 保留 ) |
对 内存警报 做出反应 | 是 | 否(使用最少的 RAM) |
毒性消息处理 | 否 | 否 |
全局 QoS 预取 | 是 | 否 |
非持久队列
流根据其假设的 用例 始终是持久的,它们不能像普通队列一样 非持久 。
排他性
流根据其假设的 用例 始终是持久的,它们不能像普通队列一样 排他 。它们并非旨在用作 临时队列 。
全局 QoS
流不支持全局 QoS 预取 ,其中通道为使用该通道的所有消费者设置单个预取限制。如果尝试从启用全局 QoS 的通道中的流进行消费,则将返回通道错误。
使用 每个消费者的 QoS 预取 ,这是几个流行客户端中的默认设置。
数据保留
流作为不可变的追加式磁盘日志实现。这意味着日志将无限增长,直到磁盘用完。为了避免这种情况,可以为每个流设置保留配置,该配置将根据总日志数据大小和/或年龄丢弃日志中最旧的数据。
有两个参数控制流的保留。这些可以组合使用。这些参数是在声明时使用队列参数设置的,或者作为可以动态更新的策略设置的。
max-age
:
有效单位:Y、M、D、h、m、s
例如,
7D
表示一周
max-length-bytes
:
以字节为单位的最大总大小
注意:保留是在每个段的基础上评估的,因此还有一个参数生效,那就是流的段大小。只要段至少包含一条消息,流就会始终保留至少一个段。当使用代理提供的 偏移量跟踪 时,每个消费者的偏移量会作为非消息数据持久化到流本身。
性能特征
由于流在执行任何操作之前都将所有数据持久化到磁盘,因此建议使用尽可能快的磁盘。
由于流的磁盘 I/O 密集型特性,其吞吐量会随着消息大小的增加而降低。
与仲裁队列一样,流也会受到集群大小的影响。流的副本越多,其吞吐量通常就越低,因为复制数据并达成共识需要完成更多工作。
控制初始复制因子
x-initial-cluster-size
队列参数控制初始流集群应跨越多少个 Rabbit 节点。
管理流副本
流的副本由操作员显式管理。当将新节点添加到集群时,它不会托管任何流副本,除非操作员将其显式添加到流的副本集中。
当必须停用(永久从集群中移除)节点时,必须将其显式地从其当前托管副本的所有流的副本列表中移除。
提供了两个
CLI 命令
来执行上述操作,即
rabbitmq-streams add_replica
和
rabbitmq-streams delete_replica
rabbitmq-streams add_replica [-p <vhost>] <stream-name> <node>
rabbitmq-streams delete_replica [-p <vhost>] <stream-name> <node>
要成功添加和移除副本,流协调器必须在集群中可用。
需要注意的是,在执行涉及成员资格更改的维护操作时,不要意外地使流不可用,从而导致失去仲裁。
由于流成员资格本身不嵌入到流中,因此目前无法完全安全地添加副本。因此,如果在任何时间存在不同步的副本,则无法添加另一个副本,并且会返回错误。
替换集群节点时,更安全的方法是先添加一个新节点,等待它同步,然后停用它替换的节点。
可以使用以下命令查询流的复制状态
rabbitmq-streams stream_status [-p <vhost>] <stream-name>
此外,可以使用以下命令重新启动流
rabbitmq-streams restart_stream [-p <vhost>] <stream-name>
流行为
每个流都有一个主写入器(领导者)和零个或多个副本。
领导者选举和故障处理
声明新流时,将随机选择托管其副本的节点集,但始终包含声明流的客户端连接到的节点。
哪个副本成为初始领导者可以通过三种方式控制,即使用
x-queue-leader-locator
可选队列参数
,设置
queue-leader-locator
策略键或在
配置文件
中定义
queue_leader_locator
键。以下是可能的值
client-local
:选择声明流的客户端连接到的节点。这是默认值。
balanced
:如果总体上少于 1000 个队列(经典队列、仲裁队列和流),则选择托管最少数量的流领导者的节点。如果总体上超过 1000 个队列,则选择一个随机节点。
流需要声明的节点中大多数节点可用才能正常运行。当承载流的 *leader* 的 RabbitMQ 节点发生故障或停止时,承载该流的 *replica* 之一的另一个节点将被选为 leader 并恢复操作。
发生故障并重新加入的副本将与 leader 重新同步(“追赶”)。与仲裁队列类似,副本的临时故障不需要从当前选出的 leader 进行完全重新同步。如果重新加入的副本落后于 leader,则只会传输增量数据。这个“追赶”过程不会影响 leader 的可用性。
必须显式添加副本。当添加新的副本时 添加 ,它将从 leader 同步整个流状态,类似于新添加的仲裁队列副本。
容错和在线副本的最小数量
共识系统可以在数据安全方面提供一定的保证。这些保证确实意味着在这些保证变得相关之前需要满足某些条件,例如需要至少三个集群节点才能提供容错,并且需要超过一半的成员才能正常工作。
可以通过表格描述不同大小集群的容错特性
集群节点数 | 容忍的节点故障数 | 容忍网络分区 |
---|---|---|
1 | 0 | 不适用 |
2 | 0 | 否 |
3 | 1 | 是 |
4 | 1 | 如果一方存在多数,则为是 |
5 | 2 | 是 |
6 | 2 | 如果一方存在多数,则为是 |
7 | 3 | 是 |
8 | 3 | 如果一方存在多数,则为是 |
9 | 4 | 是 |
使用流时的数据安全
流将数据复制到多个节点,并且只有在数据已复制到流副本的仲裁时才会发出发布者确认。
流始终将数据存储在磁盘上,但是,它们不会显式地将数据从操作系统页面缓存刷新(fsync)到底层存储介质,而是依赖操作系统在需要时进行刷新。这意味着服务器的非受控关闭可能导致该节点上承载的副本数据丢失。虽然理论上这打开了确认数据丢失的可能性,但在正常操作期间发生这种情况的可能性非常小,并且单个节点上的数据丢失通常只会从系统中的其他节点重新复制。
如果需要更高的数据安全性,请考虑改用仲裁队列,因为只有在至少有仲裁数量的节点写入 *并且* 将数据刷新到磁盘后才会发出发布者确认。
对于未使用发布者确认机制确认的消息,不提供任何保证 。此类消息可能会在“中途”丢失,在操作系统缓冲区中或以其他方式无法到达流 leader。
流可用性
流应该能够容忍少数流副本不可用,而不会或很少影响可用性。
请注意,根据所使用的 分区处理策略 ,RabbitMQ 可能会在恢复期间自行重启并重置节点,但只要不发生这种情况,此可用性保证应该仍然有效。
例如,具有三个副本的流可以容忍一个节点故障而不会丢失可用性。具有五个副本的流可以容忍两个节点故障,依此类推。
如果无法恢复仲裁数量的节点(例如,如果 3 个 RabbitMQ 节点中的 2 个永久丢失),则队列将永久不可用,并且很可能需要操作员介入才能恢复。
配置流
有关流协议端口、TLS 和其他配置,请参阅 流插件指南 。有关所需的流复制端口,请参阅 网络指南 。
流如何使用资源
流通常比仲裁队列具有更低的 CPU 和内存占用。
所有数据都存储在磁盘上,只有未写入的数据存储在内存中。
使用流时的偏移量跟踪
当使用代理提供的偏移量跟踪功能时(目前仅在使用 流插件 时可用),偏移量会作为非消息数据本身持久化到流中。这意味着,当请求偏移量持久化时,流的磁盘大小将随着每次偏移量持久化请求而略微增加。
消息编码
流在内部将其消息存储为 AMQP 1.0 编码的数据。这意味着当使用 AMQP 0.9.1 发布时会发生转换。尽管 AMQP 1.0 数据模型大多能够包含所有 AMQP 0.9.1 的数据模型,但有一些限制。如果 AMQP 0.9.1 消息包含具有复杂值的 header 条目(例如数组或表格),则这些 header 不会被转换。这是因为 header 作为应用程序属性存储在 AMQP 1.0 消息中,并且这些属性只能包含简单类型的值,例如字符串和数字。