问答文章1 问答文章501 问答文章1001 问答文章1501 问答文章2001 问答文章2501 问答文章3001 问答文章3501 问答文章4001 问答文章4501 问答文章5001 问答文章5501 问答文章6001 问答文章6501 问答文章7001 问答文章7501 问答文章8001 问答文章8501 问答文章9001 问答文章9501

RabbitMQ 的七种消息传递形式

发布网友 发布时间:2024-09-27 00:26

我来回答

1个回答

热心网友 时间:2024-10-27 05:43

今天这篇文章比较简单,来和小伙伴们分享一下 RabbitMQ 的七种消息传递形式。一起来看看。

大部分情况下,我们可能都是在 Spring Boot 或者 Spring Cloud 环境下使用 RabbitMQ,因此本文我也主要从这两个方面来和大家分享 RabbitMQ 的用法。

1. RabbitMQ 架构简介

一图胜千言,如下:

这张图中涉及到如下一些概念:

生产者(Publisher):发布消息到 RabbitMQ 中的交换机(Exchange)上。

交换机(Exchange):和生产者建立连接并接收生产者的消息。

消费者(Consumer):监听 RabbitMQ 中的 Queue 中的消息。

队列(Queue):Exchange 将消息分发到指定的 Queue,Queue 和消费者进行交互。

路由(Routes):交换机转发消息到队列的规则。

2. 准备工作

大家知道,RabbitMQ 是 AMQP 阵营里的产品,Spring Boot 为 AMQP 提供了自动化配置依赖 spring-boot-starter-amqp,因此首先创建 Spring Boot 项目并添加该依赖,如下:

项目创建成功后,在 application.properties 中配置 RabbitMQ 的基本连接信息,如下:

spring.rabbitmq.host=localhostspring.rabbitmq.username=guestspring.rabbitmq.password=guestspring.rabbitmq.port=5672

接下来进行 RabbitMQ 配置,在 RabbitMQ 中,所有的消息生产者提交的消息都会交由 Exchange 进行再分配,Exchange 会根据不同的策略将消息分发到不同的 Queue 中。

RabbitMQ 官网介绍了如下几种消息分发的形式:

这里给出了七种,其中第七种是消息确认,消息确认这块松哥之前发过相关的文章,传送门:

四种策略确保 RabbitMQ 消息发送可靠性!你用哪种?

RabbitMQ 高可用之如何确保消息成功消费

所以这里我主要和大家介绍前六种消息收发方式。

3. 消息收发3.1 Hello World

咦?这个咋没有交换机?这个其实是默认的交换机,我们需要提供一个生产者一个队列以及一个消费者。消息传播图如下:

来看看代码实现:

先来看看队列的定义:

@ConfigurationpublicclassHelloWorldConfig{publicstaticfinalStringHELLO_WORLD_QUEUE_NAME="hello_world_queue";@BeanQueuequeue1(){returnnewQueue(HELLO_WORLD_QUEUE_NAME);}}

再来看看消息消费者的定义:

@ComponentpublicclassHelloWorldConsumer{@RabbitListener(queues=HelloWorldConfig.HELLO_WORLD_QUEUE_NAME)publicvoidreceive(Stringmsg){System.out.println("msg="+msg);}}

消息发送:

@SpringBootTestclassRabbitmqdemoApplicationTests{@AutowiredRabbitTemplaterabbitTemplate;@TestvoidcontextLoads(){rabbitTemplate.convertAndSend(HelloWorldConfig.HELLO_WORLD_QUEUE_NAME,"hello");}}

这个时候使用的其实是默认的直连交换机(DirectExchange),DirectExchange 的路由策略是将消息队列绑定到一个 DirectExchange 上,当一条消息到达 DirectExchange 时会被转发到与该条消息 routing key 相同的 Queue 上,例如消息队列名为 “hello-queue”,则 routingkey 为 “hello-queue” 的消息会被该消息队列接收。

3.2 Work queues

这种情况是这样的:

一个生产者,一个默认的交换机(DirectExchange),一个队列,两个消费者,如下图:

一个队列对应了多个消费者,默认情况下,由队列对消息进行平均分配,消息会被分到不同的消费者手中。消费者可以配置各自的并发能力,进而提高消息的消费能力,也可以配置手动 ack,来决定是否要消费某一条消息。

先来看并发能力的配置,如下:

@ComponentpublicclassHelloWorldConsumer{@RabbitListener(queues=HelloWorldConfig.HELLO_WORLD_QUEUE_NAME)publicvoidreceive(Stringmsg){System.out.println("receive="+msg);}@RabbitListener(queues=HelloWorldConfig.HELLO_WORLD_QUEUE_NAME,concurrency="10")publicvoidreceive2(Stringmsg){System.out.println("receive2="+msg+"------->"+Thread.currentThread().getName());}}

可以看到,第二个消费者我配置了 concurrency 为 10,此时,对于第二个消费者,将会同时存在 10 个子线程去消费消息。

启动项目,在 RabbitMQ 后台也可以看到一共有 11 个消费者。

此时,如果生产者发送 10 条消息,就会一下都被消费掉。

消息发送方式如下:

@SpringBootTestclassRabbitmqdemoApplicationTests{@AutowiredRabbitTemplaterabbitTemplate;@TestvoidcontextLoads(){for(inti=0;i<10;i++){rabbitTemplate.convertAndSend(HelloWorldConfig.HELLO_WORLD_QUEUE_NAME,"hello");}}}

消息消费日志如下:

可以看到,消息都被第一个消费者消费了。但是小伙伴们需要注意,事情并不总是这样(多试几次就可以看到差异),消息也有可能被第一个消费者消费(只是由于第二个消费者有十个线程一起开动,所以第二个消费者消费的消息占比更大)。

当然消息消费者也可以开启手动 ack,这样可以自行决定是否消费 RabbitMQ 发来的消息,配置手动 ack 的方式如下:

spring.rabbitmq.listener.simple.acknowledge-mode=manual

消费代码如下:

@ComponentpublicclassHelloWorldConsumer{@RabbitListener(queues=HelloWorldConfig.HELLO_WORLD_QUEUE_NAME)publicvoidreceive(Messagemessage,Channelchannel)throwsIOException{System.out.println("receive="+message.getPayload());channel.basicAck(((Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG)),true);}@RabbitListener(queues=HelloWorldConfig.HELLO_WORLD_QUEUE_NAME,concurrency="10")publicvoidreceive2(Messagemessage,Channelchannel)throwsIOException{System.out.println("receive2="+message.getPayload()+"------->"+Thread.currentThread().getName());channel.basicReject(((Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG)),true);}}

此时第二个消费者拒绝了所有消息,第一个消费者消费了所有消息。

这就是 Work queues 这种情况。

3.3 Publish/Subscrite

再来看发布订阅模式,这种情况是这样:

一个生产者,多个消费者,每一个消费者都有自己的一个队列,生产者没有将消息直接发送到队列,而是发送到了交换机,每个队列绑定交换机,生产者发送的消息经过交换机,到达队列,实现一个消息被多个消费者获取的目的。需要注意的是,如果将消息发送到一个没有队列绑定的 Exchange上面,那么该消息将会丢失,这是因为在 RabbitMQ 中 Exchange 不具备存储消息的能力,只有队列具备存储消息的能力,如下图:

这种情况下,我们有四种交换机可供选择,分别是:

Direct

Fanout

Topic

Header

我分别来给大家举一个简单例子看下。

3.3.1 Direct

DirectExchange 的路由策略是将消息队列绑定到一个 DirectExchange 上,当一条消息到达 DirectExchange 时会被转发到与该条消息 routing key 相同的 Queue 上,例如消息队列名为 “hello-queue”,则 routingkey 为 “hello-queue” 的消息会被该消息队列接收。DirectExchange 的配置如下:

@ConfigurationpublicclassRabbitDirectConfig{publicfinalstaticStringDIRECTNAME="javaboy-direct";@BeanQueuequeue(){returnnewQueue("hello-queue");}@BeanDirectExchangedirectExchange(){returnnewDirectExchange(DIRECTNAME,true,false);}@BeanBindingbinding(){returnBindingBuilder.bind(queue()).to(directExchange()).with("direct");}}

首先提供一个消息队列Queue,然后创建一个DirectExchange对象,三个参数分别是名字,重启后是否依然有效以及长期未用时是否删除。

创建一个Binding对象将Exchange和Queue绑定在一起。

DirectExchange和Binding两个Bean的配置可以省略掉,即如果使用DirectExchange,可以只配置一个Queue的实例即可。

再来看看消费者:

@ComponentpublicclassDirectReceiver{@RabbitListener(queues="hello-queue")publicvoidhandler1(Stringmsg){System.out.println("DirectReceiver:"+msg);}}

通过 @RabbitListener 注解指定一个方法是一个消息消费方法,方法参数就是所接收到的消息。然后在单元测试类中注入一个 RabbitTemplate 对象来进行消息发送,如下:

@ConfigurationpublicclassHelloWorldConfig{publicstaticfinalStringHELLO_WORLD_QUEUE_NAME="hello_world_queue";@BeanQueuequeue1(){returnnewQueue(HELLO_WORLD_QUEUE_NAME);}}0

最终执行结果如下:

3.3.2 Fanout

FanoutExchange 的数据交换策略是把所有到达 FanoutExchange 的消息转发给所有与它绑定的 Queue 上,在这种策略中,routingkey 将不起任何作用,FanoutExchange 配置方式如下:

@ConfigurationpublicclassHelloWorldConfig{publicstaticfinalStringHELLO_WORLD_QUEUE_NAME="hello_world_queue";@BeanQueuequeue1(){returnnewQueue(HELLO_WORLD_QUEUE_NAME);}}1

在这里首先创建 FanoutExchange,参数含义与创建 DirectExchange 参数含义一致,然后创建两个 Queue,再将这两个 Queue 都绑定到 FanoutExchange 上。接下来创建两个消费者,如下:

@ConfigurationpublicclassHelloWorldConfig{publicstaticfinalStringHELLO_WORLD_QUEUE_NAME="hello_world_queue";@BeanQueuequeue1(){returnnewQueue(HELLO_WORLD_QUEUE_NAME);}}2

两个消费者分别消费两个消息队列中的消息,然后在单元测试中发送消息,如下:

@ConfigurationpublicclassHelloWorldConfig{publicstaticfinalStringHELLO_WORLD_QUEUE_NAME="hello_world_queue";@BeanQueuequeue1(){returnnewQueue(HELLO_WORLD_QUEUE_NAME);}}3

注意这里发送消息时不需要 routingkey,指定 exchange 即可,routingkey 可以直接传一个 null。

最终执行日志如下:

3.3.3 Topic

TopicExchange 是比较复杂但是也比较灵活的一种路由策略,在 TopicExchange 中,Queue 通过 routingkey 绑定到 TopicExchange 上,当消息到达 TopicExchange 后,TopicExchange 根据消息的 routingkey 将消息路由到一个或者多个 Queue 上。TopicExchange 配置如下:

@ConfigurationpublicclassHelloWorldConfig{publicstaticfinalStringHELLO_WORLD_QUEUE_NAME="hello_world_queue";@BeanQueuequeue1(){returnnewQueue(HELLO_WORLD_QUEUE_NAME);}}4

首先创建 TopicExchange,参数和前面的一致。然后创建三个 Queue,第一个 Queue 用来存储和 “xiaomi” 有关的消息,第二个 Queue 用来存储和 “huawei” 有关的消息,第三个 Queue 用来存储和 “phone” 有关的消息。

将三个 Queue 分别绑定到 TopicExchange 上,第一个 Binding 中的 “xiaomi.#” 表示消息的 routingkey 凡是以 “xiaomi” 开头的,都将被路由到名称为 “xiaomi” 的 Queue 上,第二个 Binding 中的 “huawei.#” 表示消息的 routingkey 凡是以 “huawei” 开头的,都将被路由到名称为 “huawei” 的 Queue 上,第三个 Binding 中的 “#.phone.#” 则表示消息的 routingkey 中凡是包含 “phone” 的,都将被路由到名称为 “phone” 的 Queue 上。

接下来针对三个 Queue 创建三个消费者,如下:

@ConfigurationpublicclassHelloWorldConfig{publicstaticfinalStringHELLO_WORLD_QUEUE_NAME="hello_world_queue";@BeanQueuequeue1(){returnnewQueue(HELLO_WORLD_QUEUE_NAME);}}5

然后在单元测试中进行消息的发送,如下:

@ConfigurationpublicclassHelloWorldConfig{publicstaticfinalStringHELLO_WORLD_QUEUE_NAME="hello_world_queue";@BeanQueuequeue1(){returnnewQueue(HELLO_WORLD_QUEUE_NAME);}}6

根据 RabbitTopicConfig 中的配置,第一条消息将被路由到名称为 “xiaomi” 的 Queue 上,第二条消息将被路由到名为 “huawei” 的 Queue 上,第三条消息将被路由到名为 “xiaomi” 以及名为 “phone” 的 Queue 上,第四条消息将被路由到名为 “huawei” 以及名为 “phone” 的 Queue 上,最后一条消息则将被路由到名为 “phone” 的 Queue 上。

3.3.4 Header

HeadersExchange 是一种使用较少的路由策略,HeadersExchange 会根据消息的 Header 将消息路由到不同的 Queue 上,这种策略也和 routingkey无关,配置如下:

@ConfigurationpublicclassHelloWorldConfig{publicstaticfinalStringHELLO_WORLD_QUEUE_NAME="hello_world_queue";@BeanQueuequeue1(){returnnewQueue(HELLO_WORLD_QUEUE_NAME);}}7

这里的配置大部分和前面介绍的一样,差别主要体现的 Binding 的配置上,第一个 bindingName 方法中,whereAny 表示消息的 Header 中只要有一个 Header 匹配上 map 中的 key/value,就把该消息路由到名为 “name-queue” 的 Queue 上,这里也可以使用 whereAll 方法,表示消息的所有 Header 都要匹配。whereAny 和 whereAll 实际上对应了一个名为 x-match 的属性。bindingAge 中的配置则表示只要消息的 Header 中包含 age,不管 age 的值是多少,都将消息路由到名为 “age-queue” 的 Queue 上。

接下来创建两个消息消费者:

@ConfigurationpublicclassHelloWorldConfig{publicstaticfinalStringHELLO_WORLD_QUEUE_NAME="hello_world_queue";@BeanQueuequeue1(){returnnewQueue(HELLO_WORLD_QUEUE_NAME);}}8

注意这里的参数用 byte 数组接收。然后在单元测试中创建消息的发送方法,这里消息的发送也和 routingkey 无关,如下:

@ConfigurationpublicclassHelloWorldConfig{publicstaticfinalStringHELLO_WORLD_QUEUE_NAME="hello_world_queue";@BeanQueuequeue1(){returnnewQueue(HELLO_WORLD_QUEUE_NAME);}}9

这里创建两条消息,两条消息具有不同的 header,不同 header 的消息将被发到不同的 Queue 中去。

最终执行效果如下:

3.4 Routing

这种情况是这样:

一个生产者,一个交换机,两个队列,两个消费者,生产者在创建 Exchange 后,根据 RoutingKey 去绑定相应的队列,并且在发送消息时,指定消息的具体 RoutingKey 即可。

如下图:

这个就是按照 routing key 去路由消息,我这里就不再举例子了,大家可以参考 3.3.1 小结。

3.5 Topics

这种情况是这样:

一个生产者,一个交换机,两个队列,两个消费者,生产者创建 Topic 的 Exchange 并且绑定到队列中,这次绑定可以通过 * 和 # 关键

声明声明:本网页内容为用户发布,旨在传播知识,不代表本网认同其观点,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。E-MAIL:11247931@qq.com
找专业防水队做完还漏水怎么维权 法院会受理房屋漏水造成的纠纷吗? 巴西龟最长活多久,家养!!! 养胃的药最好的是什么啊 婴儿积食发烧不愿吃药怎么办 板门穴位在哪个部位 手机设置放偷看的方法? 凝结水回收器生产厂家? 个人账户养老金预测公式:现有5万元,缴费20年,能领多少钱? 临沂比较有名的男装品牌 48岁离二次婚的女人如何 离两次婚的男人和离两次婚的女人有什么区别? 空调多少瓦算大? 吴倩成功带火了复古“毛毛裤”,堪称装嫩神器,不给00后留退路_百度... 玻尿酸隆鼻后注意事项 襄樊市中心医院的特色科室 成都大学附属医院能看大叶性肺炎吗? 为什么steam下载游戏无互联网连接网络没问题但steam连不上去怎么办 湘潭市中心医院呼吸科现拥有哪些先进的医疗水平? 湘潭中心医院呼吸科是否能够治疗间质性肺炎?稳当吗? steam突然连不上网了?steam无法连接到网络怎么办? 湘潭市中心医院呼吸科的对于气管镜下介入治疗这方面的经验怎么样呢?技... 电脑有网为什么steam就是连不上网 steam明明有网络却说网络连接不上 电动车能不能通过快递公司寄回家 卖鱼小伙与美女总裁是什么电视剧 嘿 亲爱的另外一个情侣网名 谁有像“嘿丶你的益达”这种搞怪网名求大神帮助 嘿富泪的另一个情侣网名是什么 求6字情侣网名,类似于“嘿,MEN”、、两个人打招呼的就可以了 捷豹p1290故障码解释和消除方法,捷豹P1290故障码怎么解决? 再婚以后的你生活的怎么样? ...道奇,吉普p1290故障码解释和消除方法,P1290故障码怎么解决? 五十铃p1290故障码解释和消除方法,五十铃P1290故障码怎么解决? 尼桑,英菲尼迪p1290故障码解释和消除方法,P1290故障码怎么解决? 路虎p1290故障码解释和消除方法,路虎P1290故障码怎么解决? 怎么看离二次婚的女人 江淮m3p1290故障码怎么解决的 对于一个离了二次婚的女人,第三次会幸福吗 我和我女朋友分手了!我想要离开,她不让!可是这几天我们老吵,我又想到... lol邪恶小法师到后期的法术强度竟然达到了1500,这是怎么怎么做到的_百 ... LOL里面推荐一个法强比较高的法师 ...吵架可是一说分手她就闹.而且是那种没分寸的闹.我该怎么办 英雄联盟哪个英雄出高法伤最划算? 他不想和他女朋友分手,每次我提分手的时候他又哭是为什么? 当电脑关机时,微信还能在线吗? ...不同意,提出青春损失费5万,可是我拿不出,我该怎么办? 电脑上显示通过微信接收离线消息,此时的微信是关闭了还是运行中 微信离线消息怎么用的 ...提建议也不管用,还是为所欲为。想分手真的舍不得,我该怎么办啊...