MQ《RabbitMQ学习》重点

  1. 持久化
  2. 调度策略
  3. 分配策略
  4. 状态反馈
    1. 1. 信息发布的确认
    2. 2. 消息提取的确认
  5. 消息的BasicProperties
  6. 高可用

总结一下 4 + 1 个概念, 或者说, 五种角色:

  • Producing , 生产者, 产生消息的角色.
  • Exchange , 交换器, 在得到生产者的消息后, 把消息扔到队列的角色.
  • Queue , 队列, 消息暂时呆的地方.
  • Consuming , 消费者, 把消息从队列中取出的角色.
  • 消息 Message , RabbitMQ 中的消息有自己的一系列属性, 某些属性对信息流有直接影响.

在使用过程中, 我们通常还会关注如下的机制:

  • 持久化 , 服务重启时, 是否能恢复队列中的数据.
  • 调度策略 , 交换器如何把消息给到哪些队列, 是每个队列给一条, 或者把一条消息给多个队列.
  • 分配策略 , 队列面对消费者时, 如何把消息吐出去, 来一个消费者就把消息全给它, 还是只给一条.
  • 状态反馈 , 当消息从某一个队列中被提出后, 这个消息的生命周期就此结束, 还是说需要一个具体的信号以明确标识消息已被正确处理.

持久化

默认情况下, 消息, 队列, 交换器 都不具有持久化的性质. 如果我们需要持久化功能, 那么在声明的时候就需要配置好.

交换器和队列的持久化性质, 在声明时通过一个 durable 参数即可实现:

// 一经声明, 则其性质无法再被更改. 当然, 你可以删除它们再重新声明
channel.exchange_declare(exchange='first', type='fanout', durable=True)
channel.queue_declare(queue='hello', durable=True)

消息 的属性的持久化,需要配置delivery_mode属性,但仅限此条消息:

channel.basic_publish(exchange='first',
                      routing_key='',
                      body='Hello World!',
                      properties=pika.BasicProperties(
                         delivery_mode = 2,
                      ))

消息可能会丢,设计fsync等。还需要配置其它的一些机制, 比如后面会谈到的 状态反馈 中的 confirm mode

调度策略

交换器 -> 队列。

交换器的类型, 内置的有四种, 分别是:

  • fanout:”广播”,把消息转发给所有绑定的队列
  • direct:”先匹配, 再投送”,routing_key 匹配时, 才会被交换器投送到绑定的队列中去.
  • topic:相当于支持模式的“direct”。可以使用两个通配符:*、#。
  • headers:根据规则匹配。自定义匹配规则,键值对( headers 属性), 当这些键值对有一对, 或全部匹配时, 消息被投送到对应队列.

分配策略

队列 -> 消费者。

ack 机制:一次只吐一条消息。

状态反馈

1. 信息发布的确认

生产者 -> exchange -> queue

这个过程中可能出现意外:

  • exchange 的名字写错了:直接投递失败,没问题。
  • exchange 得到消息后, 发现没有对应的 queue 可以投送。消息可能丢失
  • 投送到 queue 后当前没有消费者来提取它:消息保存在queue,没问题。

解决方案:

channel 设置到 confirm mode , 也称之为 Publisher Acknowledgements 机制 (和消息的 ack 机制区分开). 它的目的就是为了确认 Producing 发出的信息的状态.

channel.confirm_delivery()

之后的 publish 行为就可以收到服务器的反馈. 比如在 basic_publish 函数中, 通过 mandatory=True 参数来确认发出的消息是否有 queue 接收, 并且所有 queue 都成功接收.

2. 消息提取的确认

queue -> 消费者

在未关闭消息的 ack 机制的情况下, 当消息被 Consuming 从队列中提取后, 在未明确获取确认信息之前, 队列中的消息是不会被删除的.

  • 要确认消息, 或者拒绝消息, 使用对应的 basic_ackbaskc_reject 方法:分别是确认或拒绝多条消息。
  • rejectnack 中还有一个 requeue 参数, 表示被拒绝的消息是否可以被重新分配. 默认是 True

消息的BasicProperties

AMQP 协议中, 为消息预定了 14 个属性, 有些在前面我们已经用到过了, 有些则本来就很少用到:

  • content_type 标明消息的类型.
  • content_encoding 标明消息的编码.
  • headers 可扩展的信息对.
  • delivery_mode2 时表示该消息需要被持久化支持.
  • priority 该消息的权重.
  • correlation_id 用于”请求”与”响应”之间的匹配.
  • reply_to “响应”的目标队列.
  • expiration 有效期.
  • message_id 消息的ID.
  • timestamp 一个时间戳.
  • type 消息的类型.
  • user_id 用户的ID.
  • app_id 应用的ID.
  • cluster_id 服务集群ID.

高可用

RabbitMQ 有三种模式:

  • 单机模式
  • 普通集群模式:
  • 普通集群模式:以两个节点(rabbit01、rabbit02)为例来进行说明。对于Queue来说,消息实体只存在于其中一个节点rabbit01(或者rabbit02),rabbit01和rabbit02两个节点仅有相同的元数据,即队列的结构。当消息进入rabbit01节点的Queue后,consumer从rabbit02节点消费时,RabbitMQ会临时在rabbit01、rabbit02间进行消息传输,把A中的消息实体取出并经过B发送给consumer。
  • 镜像集群模式:将需要消费的[队列变为镜像队列,存在于多个节点,这样就可以实现RabbitMQ的HA高可用性。作用就是消息实体会主动在镜像节点之间实现同步,而不是像普通模式那样,在consumer消费数据时临时读取。缺点就是,集群内部的同步通讯会占用大量的网络带宽。

如何开启镜像集群模式:在控制台新增一个镜像集群模式的策略,指定的时候可以要求数据同步到所有节点,也可以要求同步到指定节点,然后在创建queue的时候,应用这个策略,就会自动将数据同步到其他的节点上面去了。


转载请注明来源