RabbitMQ介绍

什么是RabbitMQ?

RabbitMQ是一套开源(MPL)的消息队列服务软件,是由 LShift 提供的一个 Advanced Message Queuing Protocol (AMQP,高级消息队列协议) 的开源实现,由以高性能、健壮以及可伸缩性出名的 Erlang 写成。最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

RabbitMQ主要是为了实现系统之间的双向解耦而实现的。当生产者大量产生数据时,消费者无法快速消费,那么需要一个中间层。保存这个数据。

例如一个日志系统,很容易使用RabbitMQ简化工作量,一个Consumer可以进行消息的正常处理,另一个Consumer负责对消息进行日志记录,只要在程序中指定两个Consumer所监听的queue以相同的方式绑定到同一exchange即可,剩下的消息分发工作由RabbitMQ完成。

单向解耦

“Producer”--
|
|----->"RabbitMQ Clusters" ---> “Consumer”
"Producer"--

双向解耦(如:RPC)

“Producer1”-->
|
|<----->"RabbitMQ Clusters" <---> “Consumer2&Producer2”
"Consumer1"<--

RabbitMQ的使用基础

使用RabbitMQ server需要:

  1. ErLang语言包;
  2. RabbitMQ安装包;

基础概念

交换机(exchange):

  1. 接收消息,转发消息到绑定的队列。四种类型:direct, topic, headers and fanout
  • direct:转发消息到routigKey指定的队列
  • topic:按规则转发消息(最灵活)
  • headers:(这个还没有接触到)
  • fanout:转发消息到所有绑定队列(广播模式)
  1. 如果交换机上(Exchange)和(Queue)是多对多的关系。

  2. topic类型交换器通过模式匹配分析消息的routing-key属性。它将routing-key和binding-key的字符串切分成单词。这些单词之间用点隔开。

    支持表达式:
    *.foo.bar # 只要包含foo.bar就可以匹配相关信息。这个是topic,性能最慢
    demo # 这个是direct,性能最好。

  3. 因为交换器是在RabbitMQ是一个实际存在的实体,不能被改变。只能删除之后,重新创建。

  4. 交换器的属性:

  • 持久性:如果启用,交换器将会在server重启前都有效。(对应Duration属性,持久化)
  • 自动删除:如果启用,那么交换器将会在其绑定的队列都被删除掉之后自动删除掉自身。(创建时候设置,如果不设置不会自动删除)。
  • 惰性:如果没有声明交换器,那么在执行到使用的时候会导致异常,并不会主动声明。(不会自动创建)

    队列(queue):

  1. 队列是RabbitMQ内部对象,存储消息。相同属性的queue可以重复定义。

  2. 临时队列。channel.queueDeclare(),有时不需要指定队列的名字,并希望断开连接时删除队列。

队列的属性:

  • 持久性:如果启用,队列将会在server重启前都有效。
  • 自动删除:如果启用,那么队列将会在所有的消费者停止使用之后自动删除掉自身。
  • 惰性:如果没有声明队列,那么在执行到使用的时候会导致异常,并不会主动声明。
  • 排他性:如果启用,队列只能被声明它的消费者使用。
  • 消息传递:

消息在队列中保存,以轮询的方式将消息发送给监听消息队列的消费者,可以动态的增加消费者以提高消息的处理能力。
为了实现负载均衡,可以在消费者端通知RabbitMQ,一个消息处理完之后才会接受下一个消息。 channel.basic_qos(prefetch_count=1) 注意:效率非常低,不能使用客户端缓存。
消息有14个属性,最常用的几种:

  • deliveryMode:持久化属性
  • contentType:编码
  • replyTo:指定一个回调队列
  • correlationId:消息id
    在client代码中,send方法时候,可以设置mandatory和immediate。设置mandatory:发送到交换器并且还未投递到队列(没有绑定器存在)得到通知。设置immediate:没有消费者能够立即处理的时候得到通知。这些投递保障机制,保证了消息可靠性。

在client代码中,send方法时候persistent属性为true。数据就会被保存到队列中,但是必须Exchange,Queue,Client三者都设置为存储状态。

RabbitMQ特性

高可用性(HA)

  1. 消息ACK,通知RabbitMQ消息已被处理,可以从内存删除。如果消费者因宕机或链接失败等原因没有发送ACK(不同于ActiveMQ,在RabbitMQ里,消息没有过期的概念),则RabbitMQ会将消息重新发送给其他监听在队列的下一个消费者。
    channel.basicConsume(queuename, noAck=false, consumer);
  2. 消息和队列的持久化。定义队列时可以指定队列的持久化属性(问:持久化队列如何删除?) channel.queueDeclare(queuename, durable=true, false, false, null); 发送消息时可以指定消息持久化属性:这样,即使RabbitMQ服务器重启,也不会丢失队列和消息。
    channel.basicPublish(exchangeName, routingKey,
    MessageProperties.PERSISTENT_TEXT_PLAIN,
    message.getBytes());
  3. publisher confirms 提供批量确认消息的方法。
  4. master/slave机制,配合Mirrored Queue。Mirrored Queue通过policy和rabbitmqctl设置可以实现。具体可以参考Rabbitmq官方文档。在Mirrored Queue下,无论Producer和Consumer连接那个RabbitMq服务器,都跟连接同一个RabbitMQ上,消费和生产数据会被同步。注意:Mirrored Queue会严重的消耗性能,性能会下降到原来的1/5。当一个slave重新加入mirrored-queue时,如果queue是durable的,则会被清空。
    通过命令行或管理插件可以查看哪个slave是同步的:
    rabbitmqctl list_queues name slave_pids synchronised_slave_pids

集群(cluster)

  1. 不支持跨网段,因为RabbitMQ底层是Erlang,会导致脑裂(Slave Node感觉Master Node死掉了,主Master Node觉得Slave2 Node死掉了,结果数据无法复制,系统逻辑出现问题)(如需支持,需要shovel或federation插件)
  2. 可以随意的动态增加或减少、启动或停止节点,允许节点故障。(但是数据同步会造成Queue服务暂停,所有的Producer和Consumer都被终止)
  3. 集群分为RAM节点和DISK节点,一个集群最好至少有一个DISK节点保存集群的状态。
  4. 集群的配置可以通过命令行,也可以通过配置文件,命令行优先。

参考资料