今天看啥  ›  专栏  ›  克里斯朵夫李维

RabbitMQ实战

克里斯朵夫李维  · 掘金  ·  · 2019-07-03 15:23
阅读 8

RabbitMQ实战

一、RabbitMQ是什么

RabbitMQ是一个基于AMQP协议的高级消息中间件,它主要的技术特点是可用性,安全性,高可用集群,多协议支持,可视化的客户端,活跃的社区。支持死信队列,优先级队列,延迟队列,重试队列等多种功能。一般单机的QPS在万级左右,可以满足一般的应用场景。文档说明非常丰富,社区活跃,上手容易。强大的可视化管理工具。

三、AMQP模型

  • Broker:消息代理,实际上就是消息服务器实体。

  • Exchange:交换机,用来发送消息的AMQP实体,生产者的消息发送到交换机,而不是直接发送到队列,交换机指定消息按什么路由规则,路由到哪个队列。

  • Binding:它的作用就是把交换机和队列按照路由规则绑定起来。

  • Routing Key:路由关键字,交换机根据这个关键字进行消息投递。

  • Virtaul Host:虚拟主机,一个消息代理里面可以开设多个虚拟主机进行逻辑隔离,用于不同用户的权限分离。

  • Connection:AMQP连接通常是长连接,Producer和Consumer都是通过TCP连接到RabbitMQ Server的。

  • Channel:AMQP通过Channle来处理多连接,可以把通道理解成共享一个TCP连接的多个轻量化连接。

四、交换机类型

  • Default:RabbitMQ提供了一个默认交换机,它隐式地绑定到每个队列,其路由键等于队列名称,我们无法显式的将队列绑定到默认交换器或从默认交换器解绑,它也不能被删除。

  • Direct:直连型交换机,根据消息携带的路由键将消息投递给对应队列的。直连交换机要求Publisher和Consumer的路由关键字完全相同才会将消息路由到绑定的队列。直连交换机用来处理消息的单播路由,虽然它可以进行多播。

    • 单播
    • 多播

  • Funout:扇型交换机,将消息路由给绑定到它身上的所有队列,而不理会绑定的路由键,扇型用来交换机处理消息的广播路由。

  • Topic:将消息路由给一个或多个队列,属于多播路。topic可以进行模糊匹配,可以使用*#这两个通配符来进行模糊匹配,其中*可以代替一个单词(必须有一个单词),#号可以代替任意个单词,但是需要注意的是topic交换机的路由键也不是可以随意设置的,必须是由.隔开的一系列的标识符组成。标识符一般和消息的某些特性相关,可以定义任意数量的标识符,上限为255个字节,当路由键可以模糊匹配上的时候就能将消息映射到绑定的队列中去。

  • Header:不常用,后续再补充。

四、编程模型

RabbitMQ消息传递模型的核心思想是,生产者永远不会将任何消息直接发送到队列。实际上,通常生产者甚至不知道消息是否会被传递到那些队列。生产者只向交换机发送消息,交换机一边接收生产者的消息,一边将消息推送到队列,交换机的类型定义了消息被处理的规则。

1. Hello World

  • 队列一般在Consumer声明,因为我们可能在Publisher启动前就启动Consumer,所以我们要确保在使用消息前队列就存在,并且Publiser的消息是发送到Exchange,而不是队列。

  • Consumer不使用try-with-resource自动关闭Connection和Channel,因为我们希望在消息异步被发送到Consumer时程序仍然保持活动状态。

2. Work Queues

使用任务队列的优点之一是我们可以轻松的并行工作,当任务提交速率远大于worker消费速率的时候,我们可以添加更多的worker以提高系统的吞吐量。

默认情况下,RabbitMQ将按顺序将每个消息发送给下一个消费者,平均每个消费者将得到相同数量的消息。这种分发消息的方式称为循环调度(round-robin)。

2.1 消息确认

完成一项任务可能需要几秒钟,当其中一个消费者启动了一个耗时很长的任务,但是只完成了一部分就挂掉了,在不做任何设置的情况下,一旦RabbitMQ向消费者投递了一条消息 ,那么消息代理会立即将这条消息标记为已删除。在这种情况下,如果消费者宕机,那我们将会丢失它正在处理的消息和已经派发给这个消费者但是尚未被他处理的消息。

但是我们并不希望任何一个任务被丢失,如果一个消费者宕机,我们希望把任务派发给另一个消费者(包括被宕机消费者处理了一半的任务)。为了确保消息永远不会丢失,RabbitMQ支持 message acknowledgments。消费者会返回一个ack(nowledgement)告诉RabbitMQ消费者已经接收并处理了一条消息并且RabbitMQ可以删除它。

如果一个消费者正在处理一条消息时挂掉了(channel is closed, connection is closed, 或者TCP 连接丢失),它将没有机会发送ack确认,RabbitMQ就认为该消息没有消费成功,于是便会将该消息重新放到队列中,如果此时有其他消费者在线,RabbitMQ会立即将该条消息再转发给其他在线的消费者,这种机制可以保证任何消息都不会丢失。

我们通过autoAck=fasle显示关闭自动确认。

2.2 消息持久化

我们已经学会了如何确保即使消费者挂掉,消息也不会被丢失,但是如果RabbitMQ服务器挂掉,我们的消息依旧会被丢失。当RabbitMQ服务器stop或者奔溃的时候,队列和消息将会被丢失,为了确保消息不被丢失我们需要做两件事:将队列和消息都标记为持久化的。

将消息标记为持久化并不能完全保证消息不会丢失。虽然它告诉RabbitMQ将消息保存到磁盘,但是当RabbitMQ收到了一条消息并且还没有保存它时,仍然有一个短时间窗口。此外,RabbitMQ不会对每条消息都执行持久化磁盘得操作——它可能只是保存到缓存中,而不是真正写到磁盘上。持久性保证并不强,但对于我们的简单任务队列来说已经足够了。如果需要更强的保证,可以使用publisher confirm

源码:github.com/IndiraFinis…




原文地址:访问原文地址
快照地址: 访问文章快照