如果只停留在如何使用 RabbitMQ 层面,根据官方教程了解一些基本的组件比如 Channel, Exchange, Queue 这几个组件的概念以及他们之间的关系基本可以应对大部分使用场景了。但是本着学习其运行机制的心理,还是稍微深入一点,探究一下其消息发送的整个过程。

RabbitMQ 启动过程

初始化

image

首先会执行启动脚本,然后读取环境变量,加载配置,启动 erlang node,更据 RABBITMQ_START_RABBIT 环境变量启动对应的模块,默认会启动 Rabbit:boot ()。接下来会根据配置文件更新配置,执行一些初始化工作,例如初始化日志,检查集群一致性等。

image

紧接着启动 broker , 加载插件,然后看到启动日志。

image

之后会初始化一些内部状态,比如开启认证,启动队列镜像节点,HA 节点,Policy 验证初始化,默认启动一个优先队列,运行报警进程,内存监控,节点监控,开启监听端口,初始化 Mnesia 数据库中的表,比如用户表等。

image

建立连接

image

当客户端与 RabbitMQ 建立连接时,会经历两个阶段,一个是 TCP 握手,一个是 AMQP 握手。TCP 握手过程由 ranch acceptor 进程处理,这里说的进程都是指 erlang 进程。 ranch acceptor 进程是初始化时创建的默认 10 个,负责与客户端建立 TCP 连接,将连接交给 rabbit connection sup 进程,这个进程会创建 一个 rabbit reader 进程和一个 rabbit writer 进程,并给客户端返回 connection.start,然后 rabbit reader 进程等待客户端发来 AMQP 握手消息,AMQP 握手消息携带认证信息,认证成功返回 connection.tune,客户端收到后发送 connection.tune_okconnection.open,服务端发送 connention.open_ok,客户端收到后 AMQP 连接建立。可以看到,这里每个连接都会开启一个读进程和写进程来处理连接,所以即使 erlang 进程轻量,也不应该一直创建连接,而是要尽量复用连接。

开启 Channel

image

当客户端发起 channel.open 时,首先会交个 reader 进程处理,然后开启两个 channel sup 进程,这两个进程是分级的,二级 channel sup 进程会开启一个 channel 进程,然后返回 ChPidreaderreader 收到后给 channel common 发送 channel.openchannel common 发送给 channel,最后由 writer 返回给客户端 channel.open_ok , Channel 创建成功。

image

  • 这里会检查是不是开启 channel 0, 因为 channel 0 是共用的,并且是在初始化阶段就创建的。
  • 所有 channelPID 字典都会保存在 reader 进程的内存中,如果在内存中查不到才会重新创建。
  • 会判断创建的 channel 是否达到上限。

一个 channel 创建会同时创建很多的管理进程。如果使用 direct channel,那就只会创建一个 channel 进程和一个 limiter 进程,limiter 进程主要是实现 prefetch 功能,调节 channelconsumer 间的流量。

创建 / 声明 Exchange

image

消息交换过程与 channel 创建类似,但是这里 exchange 的信息会存到数据库里。

创建 Queue

image

创建队列时会首先在数据库中查找,如果已经存在并且 nowaitture,则直接返回 queue.declare_ok,如果为 false 则会向代理进程发起 stat 命令,代理进程判断将命令发送到本地队列还是远程队列,队列返回其对应的状态信息,如果已经启动,返回 queue.declare_ok。如果在数据中不存在,则会走创建流程,会根据队列的类型,比如是否优先级,持久化创建队列。

Queue 绑定到 Exchange

image

队列绑定到交换机会将绑定关系保存到数据库,有则直接返回,无则创建。

消费

image

和创建队列时一样,basic.consume 命令会通过代理进程发往本地队列或远程队列,如果开启 prefetch 功能的话会开启 limiter 进程来控制流量,prefetch 就是消费者可以允许一定量的消息不用确认,用于提高消费的吞吐。同时每次消费是都会创建一个优先级队列,用来实现消费者优先级。

生产

image

生产者发送消息时会首先根据 exchange 类型 routing key 等从数据库中取出相应队列的信息,然后通过代理进程发送到相应的队列进程,队列进程在真正后端队列中判断是否是重复,然后看有没有正在消费的消费者,如果有的话会直接发送给消费者,如果没有的话会通过消息存储把消息存储下来。

关于消息存储

  • 不管是持久化还是非持久化的消息都有可以写入磁盘,持久性消息一到达后端队列就会写入磁盘,而非持久化的消息只有在内存有压力的时候,才会被写入磁盘,持久化的消息也会保留在内存中 (为了能更快把消息的发送给消费者),只有当有内存有压力的时候才会从内存中淘汰。处理磁盘写入,内存淘汰的这一层称为持久层。
  • 持久层由两个部分组成,队列索引和消息存储。队列索引负责维护关于一个给定消息在队列中的位置,以及它是否已经被交付和确认。因此,每个队列有一个队列索引。消息存储是一个消息的键值存储,在服务器的所有队列中共享。消息(正文和任何元数据字段:属性和/或标题)可以直接存储在队列索引中,也可以写到消息存储中。

内存消耗

在内存压力下,持久化层试图尽可能多地写入磁盘,并尽可能多地从内存中删除。然而,有一些东西必须留在内存中。

  • 每个队列为每个未确认的消息维护一些元数据。如果消息的目的地是消息存储,那么消息本身可以从内存中删除。
  • 消息存储需要一个索引。默认的消息存储索引为存储中的每条消息使用少量的内存。

消息嵌入到消息索引中

优点:

  • 消息可以在一次操作中写入磁盘,而不是两次;对于消息大小很小的消息,可以减少写入次数,提升性能。
  • 写入队列索引的消息不需要在消息存储中存储,因此从内存中删除时只需删除索引,不需要再删除消息存储。

缺点:

  • 如果消息很大,那么内存占用很大。
  • 如果一个消息被交换机路由到多个队列,那么该消息需要写入多个队列索引。如果这样的消息被写入消息存储,则只需要写入一份副本。
  • 未确认的消息索引将一直保存在内存中一直占用内存。

集群

image

一个节点加入集群,Mnesia 数据库的配置会更新,Mnesia 存储引擎将同步节点状态,路由表到其他节点。

高可用

image

当设置为高可用时,普通队列就会变为镜像队列主,队列状态将被动态更新,然后所有的同步操作都将由 Erlang 进程处理。

主队列不能直接与 gm 进程交互,而是要使用协调者,每个从队列都有一个 gm 进程,彼此交互,发布,处理集群事件,复制,同步集群数据。