总结一下 4 + 1
个概念, 或者说, 五种角色:
- Producing , 生产者, 产生消息的角色.
- Exchange , 交换器, 在得到生产者的消息后, 把消息扔到队列的角色.
- Queue , 队列, 消息暂时呆的地方.
- Consuming , 消费者, 把消息从队列中取出的角色.
- 消息 Message , RabbitMQ 中的消息有自己的一系列属性, 某些属性对信息流有直接影响.
在使用过程中, 我们通常还会关注如下的机制:
- 持久化 , 服务重启时, 是否能恢复队列中的数据.
- 调度策略 , 交换器如何把消息给到哪些队列, 是每个队列给一条, 或者把一条消息给多个队列.
- 分配策略 , 队列面对消费者时, 如何把消息吐出去, 来一个消费者就把消息全给它, 还是只给一条.
- 状态反馈 , 当消息从某一个队列中被提出后, 这个消息的生命周期就此结束, 还是说需要一个具体的信号以明确标识消息已被正确处理.
持久化
默认情况下, 消息, 队列, 交换器 都不具有持久化的性质. 如果我们需要持久化功能, 那么在声明的时候就需要配置好.
交换器和队列的持久化性质, 在声明时通过一个 durable
参数即可实现:
// 一经声明, 则其性质无法再被更改. 当然, 你可以删除它们再重新声明
channel.exchange_declare(exchange='first', type='fanout', durable=True)
channel.queue_declare(queue='hello', durable=True)
消息 的属性的持久化,需要配置delivery_mode
属性,但仅限此条消息:
channel.basic_publish(exchange='first',
routing_key='',
body='Hello World!',
properties=pika.BasicProperties(
delivery_mode = 2,
))
消息可能会丢,设计fsync等。还需要配置其它的一些机制, 比如后面会谈到的 状态反馈 中的 confirm mode。
调度策略
交换器 -> 队列。
交换器的类型, 内置的有四种, 分别是:
- fanout:”广播”,把消息转发给所有绑定的队列
- direct:”先匹配, 再投送”,routing_key 匹配时, 才会被交换器投送到绑定的队列中去.
- topic:相当于支持模式的“direct”。可以使用两个通配符:*、#。
- headers:根据规则匹配。自定义匹配规则,键值对(
headers
属性), 当这些键值对有一对, 或全部匹配时, 消息被投送到对应队列.
分配策略
队列 -> 消费者。
ack 机制:一次只吐一条消息。
状态反馈
1. 信息发布的确认
生产者 -> exchange -> queue
这个过程中可能出现意外:
exchange
的名字写错了:直接投递失败,没问题。- exchange 得到消息后, 发现没有对应的 queue 可以投送。消息可能丢失!
- 投送到 queue 后当前没有消费者来提取它:消息保存在queue,没问题。
解决方案:
把 channel 设置到 confirm mode , 也称之为 Publisher Acknowledgements 机制 (和消息的 ack 机制区分开). 它的目的就是为了确认 Producing 发出的信息的状态.
channel.confirm_delivery()
之后的 publish 行为就可以收到服务器的反馈. 比如在 basic_publish
函数中, 通过 mandatory=True
参数来确认发出的消息是否有 queue 接收, 并且所有 queue 都成功接收.
2. 消息提取的确认
queue -> 消费者
在未关闭消息的 ack 机制的情况下, 当消息被 Consuming 从队列中提取后, 在未明确获取确认信息之前, 队列中的消息是不会被删除的.
- 要确认消息, 或者拒绝消息, 使用对应的
basic_ack
和baskc_reject
方法:分别是确认或拒绝多条消息。 - 在
reject
和nack
中还有一个requeue
参数, 表示被拒绝的消息是否可以被重新分配. 默认是True
消息的BasicProperties
在 AMQP 协议中, 为消息预定了 14 个属性, 有些在前面我们已经用到过了, 有些则本来就很少用到:
- content_type 标明消息的类型.
- content_encoding 标明消息的编码.
- headers 可扩展的信息对.
- delivery_mode 为
2
时表示该消息需要被持久化支持. - priority 该消息的权重.
- correlation_id 用于”请求”与”响应”之间的匹配.
- reply_to “响应”的目标队列.
- expiration 有效期.
- message_id 消息的ID.
- timestamp 一个时间戳.
- type 消息的类型.
- user_id 用户的ID.
- app_id 应用的ID.
- cluster_id 服务集群ID.
高可用
RabbitMQ 有三种模式:
- 单机模式
- 普通集群模式:
- 普通集群模式:以两个节点(rabbit01、rabbit02)为例来进行说明。对于Queue来说,消息实体只存在于其中一个节点rabbit01(或者rabbit02),rabbit01和rabbit02两个节点仅有相同的元数据,即队列的结构。当消息进入rabbit01节点的Queue后,consumer从rabbit02节点消费时,RabbitMQ会临时在rabbit01、rabbit02间进行消息传输,把A中的消息实体取出并经过B发送给consumer。
- 镜像集群模式:将需要消费的[队列变为镜像队列,存在于多个节点,这样就可以实现RabbitMQ的HA高可用性。作用就是消息实体会主动在镜像节点之间实现同步,而不是像普通模式那样,在consumer消费数据时临时读取。缺点就是,集群内部的同步通讯会占用大量的网络带宽。
如何开启镜像集群模式:在控制台新增一个镜像集群模式的策略,指定的时候可以要求数据同步到所有节点,也可以要求同步到指定节点,然后在创建queue的时候,应用这个策略,就会自动将数据同步到其他的节点上面去了。
转载请注明来源