RabbitMQ深度探索:五种消息模式

RabbitMQ深度探索:五种消息模式

RabbitMQ深度探索:五种消息模式

最新推荐文章于 2025-06-02 22:00:00 发布

苏-言

最新推荐文章于 2025-06-02 22:00:00 发布

阅读量897

收藏

5

点赞数

22

CC 4.0 BY-SA版权

文章标签:

rabbitmq

分布式

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。

本文链接:https://blog.csdn.net/SOS_suyan/article/details/145438504

RabbitMQ 工作队列:

默认的传统队列是为均摊消费,存在不公平性;如果每个消费者速度不一样的情况下,均摊消费是不公平的,应该是能者多劳采用工作队列模式:

在通道中只需要设置 baseicQos 的值即可

表示 MQ 服务器每次只会给消费者推送 n 条消息必须手动应答之后才会继续发送

生产者:

public class ProducerFanout {

private static final String QUEUE_NAME = "BoyatopMamber";

public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

// 1.创建新的连接

Connection connection = RabbitMQConnection.getConnection();

// 2.设置 channel

Channel channel = connection.createChannel();

for (int i = 0; i < 10; i++) {

// 3.发送消息

String msg = "Hello my Bro" + i;

channel.confirmSelect();

channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());

boolean result = channel.waitForConfirms();

}

// 4.关闭资源

channel.close();

connection.close();

}

}

消费者1:

public class Consumer1 {

//定义队列

private static final String QUEUE_NAME = "BoyatopMamber";

public static void main(String[] args) throws IOException, TimeoutException {

//创建连接

Connection connection = RabbitMQConnection.getConnection();

//创建通道

final Channel channel = connection.createChannel();

channel.basicQos(1);

DefaultConsumer defaultConsumer = new DefaultConsumer(channel){

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

String msg = new String(body,"UTF-8");

System.out.println("用户1:" + msg);

channel.basicAck(envelope.getDeliveryTag(),false);

}

};

//监听消息

channel.basicConsume(QUEUE_NAME,false,defaultConsumer);

}

}

消费者2:

public class Consumer2 {

//定义队列

private static final String QUEUE_NAME = "BoyatopMamber";

public static void main(String[] args) throws IOException, TimeoutException {

//创建连接

Connection connection = RabbitMQConnection.getConnection();

//创建通道

final Channel channel = connection.createChannel();

channel.basicQos(3);

DefaultConsumer defaultConsumer = new DefaultConsumer(channel){

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

String msg = new String(body,"UTF-8");

System.out.println("消费者2:" + msg);

channel.basicAck(envelope.getDeliveryTag(),false);

}

};

//监听消息

channel.basicConsume(QUEUE_NAME,false,defaultConsumer);

}

}

RabbitMQ 交换机类型:

Direct exchange:直连交换机Fanout exchange:扇形交换机Topic exchange:主体交换机Headers exchange:头交换机Virtual Hostos:区分不同的团队

队列:存放消息交换机:路由消息存放在那个队列中,类似于 Nginx路由:key 分发规则

RabbitMQ Fanout 发布订阅:

生产者发送一条消息,经过交换机转发到多个不同的队列,多个不同的队列就多个不同的消费者步骤:

需要创建两个队列,每个队列都对应一个消费者队列需要绑定我们交换机生产者投递消息到交换机中,交换机再将消息分配给两个队列中都存放起来消费者从队列中获取消息

生产者:

public class OrderConsumer {

//定义队列

private static final String QUEUE_NAME = "BoyatopOrder";

//定义交换机

private static final String EXCHANGE_NAME = "fanout_exchange";

public static void main(String[] args) throws IOException, TimeoutException {

///创建连接

Connection connection = RabbitMQConnection.getConnection();

//创建通道

Channel channel = connection.createChannel();

//关联队列

channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");

DefaultConsumer defaultConsumer = new DefaultConsumer(channel){

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

String msg = new String(body,"UTF-8");

System.out.println("订单接收:" + msg);

}

};

//监听消息

channel.basicConsume(QUEUE_NAME,true,defaultConsumer);

}

}

数据消费者:

public class MamConsumer {

//定义队列

private static final String QUEUE_NAME = "BoyatopMamber";

//定义交换机的名称

private static final String EXCHANGE_NAME = "fanout_exchange";

public static void main(String[] args) throws IOException, TimeoutException {

//创建连接

Connection connection = RabbitMQConnection.getConnection();

//创建通道

final Channel channel = connection.createChannel();

//关联队列

channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");

DefaultConsumer defaultConsumer = new DefaultConsumer(channel){

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

String msg = new String(body,"UTF-8");

System.out.println("消费者接收:" + msg);

}

};

//开启监听

channel.basicConsume(QUEUE_NAME,true,defaultConsumer);

}

}

订单消费者:

public class OrderConsumer {

//定义队列

private static final String QUEUE_NAME = "BoyatopOrder";

//定义交换机

private static final String EXCHANGE_NAME = "fanout_exchange";

public static void main(String[] args) throws IOException, TimeoutException {

///创建连接

Connection connection = RabbitMQConnection.getConnection();

//创建通道

Channel channel = connection.createChannel();

//关联队列

channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");

DefaultConsumer defaultConsumer = new DefaultConsumer(channel){

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

String msg = new String(body,"UTF-8");

System.out.println("订单接收:" + msg);

}

};

//监听消息

channel.basicConsume(QUEUE_NAME,true,defaultConsumer);

}

}

Direct 路由模式:

当交换机类型为 direct 类型时,根据队列绑定的路由转发到具体的队列中存放消息生产者:

public class Producer {

//定义交换机

private static final String EXCHANGE_NAME = "newDirect_exchange";

public static void main(String[] args) throws IOException, TimeoutException {

//创建连接

Connection connection = RabbitMQConnection.getConnection();

//创建 channel

Channel channel = connection.createChannel();

//通道关联交换机

channel.exchangeDeclare(EXCHANGE_NAME,"direct",true);

//发送消息

for (int i = 0; i < 10; i++) {

String msg = "生产消息 --- 路由模式";

System.out.println(msg + i);

channel.basicPublish(EXCHANGE_NAME,"mail",null,msg.getBytes());

channel.basicPublish(EXCHANGE_NAME,"sms",null,msg.getBytes());

}

channel.close();

connection.close();

}

}

邮件消费者:

public class MailConsumer {

//定义交换机

private static final String EXCHANGE_NAME = "newDirect_exchange";

//定义队列

private static final String QUEUE_NAME = "newDirectqueueOne";

public static void main(String[] args) throws IOException, TimeoutException {

//创建连接

Connection connection = RabbitMQConnection.getConnection();

//创建 channel

Channel channel = connection.createChannel();

//关联队列

channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"mail");

DefaultConsumer defaultConsumer = new DefaultConsumer(channel){

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

String msg = new String(body,"UTF-8");

System.out.println("邮件消费者接收信息:" + msg);

}

};

//监听队列

channel.basicConsume(QUEUE_NAME,true,defaultConsumer);

}

}

短信消费者:

public class SmsConsumer {

//定义交换机

private static final String EXCHANGE_NAME = "newDirect_exchange";

//定义队列

private static final String QUEUE_NAME = "newDirectqueueTwo";

public static void main(String[] args) throws IOException, TimeoutException {

//创建连接

Connection connection = RabbitMQConnection.getConnection();

//创建 channel

Channel channel = connection.createChannel();

//关联队列

channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"sms");

DefaultConsumer defaultConsumer = new DefaultConsumer(channel){

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

String msg = new String(body,"UTF-8");

System.out.println("短信消费者接收消息:" + msg);

}

};

//监听队列

channel.basicConsume(QUEUE_NAME,true,defaultConsumer);

}

}

Topic 主体模式:

当交换机类型为 topic 类型时,根据队列绑定的路由键模糊转发到具体队列中存放#:表示支持匹配多个词*:表示只能匹配一个词

确定要放弃本次机会?

福利倒计时

:

:

立减 ¥

普通VIP年卡可用

立即使用

苏-言

关注

关注

22

点赞

5

收藏

觉得还不错?

一键收藏

知道了

0

评论

分享

复制链接

分享到 QQ

分享到新浪微博

扫一扫

举报

举报

7大模式!C# RabbitMQ消息队列深度解析与实战指南

java专栏

01-07

1077

RabbitMQ 是一个开源的消息代理软件,实现了高级消息队列协议(AMQP)。它由Erlang语言编写,支持多种消息传递协议,如AMQP、MQTT和STOMP。RabbitMQ广泛应用于分布式系统中,用于实现应用解耦、异步处理和流量控制。通过今天的分享,相信你已经对C#中使用RabbitMQ有了一个全面的了解。无论你是初学者还是有一定经验的开发者,希望这篇文章能够帮助你更好地掌握RabbitMQ的使用方法。如果你有任何疑问或者建议,欢迎在评论区留言交流哦!让我们一起加油,成为更加优秀的开发者吧!🌟✨。

参与评论

您还未登录,请先

登录

后发表或查看评论

RabbitMQ支持消息的模式

qq_44299529的博客

04-25

1680

当有多个消费者时,我们的消息会被哪个消费者消费呢,我们又该如何均衡消费者消费信息的多少呢?根据消费者的消费能力进行公平分发,处理快的处理的多,处理慢的处理的少;一个消费者一条,按均分配;

rabbitMQ的五种消息模式

m0_63086385的博客

11-24

1419

rabbitMQ的五种消息模式

RocketMQ 三大消息类型深度解析:普通消息、延迟消息、事务消息

qq_41323045的博客

05-30

805

*** 短信发送消息*/@Data// 重点:需要增加消息对应的 Topic// 重点:需要增加消息对应的 Topic/*** 短信日志编号*/@NotNull(message = "短信日志编号不能为空")/*** 手机号*/@NotNull(message = "手机号不能为空")/*** 短信渠道编号*/@NotNull(message = "短信渠道编号不能为空")/*** 短信 API 的模板编号*/

RabbitMQ详解(三):消息模式(fanout、direct、topic、work)

weixin_45181611的博客

05-10

4122

消息模式详解

RabbitMQ的几种模式介绍跟代码演示

2301_81018165的博客

02-20

497

工作队列模式实际上是一种竞争关系的模式,多个消费者之间是竞争关系,即一条消息如果被某个消费者消费了,那么其他的消费者就获取不到了。Work Queues与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。跟上面的差不多,这个的区别就是每一个消费者都定义了一个关键字,交换机会根据关键字的不同发送给不同的消费者。就是先让生产者先把消息发送到交换机上面去,再让交换机发送到消费者端。生产者生产,消费者消费,简单的点对点模式。6.自定义模式 (延迟队列)

RabbitMQ五种消息模式

qq_43390704的博客

12-21

3659

1,简单模式

工作的流程:

当生产者生产消息后,将消息发往队列.

当队列中有消息时,消费者会实时的监听队列中的消息.如果有消息则会执行消息

2,工作模式

默认的传统队列是为均摊消费,存在不公平性;如果每个消费者速度不一样的情况下,均摊消费是不公平的,应该是能者多劳。

采用工作队列

在通道中只需要设置basicQos为1即可,表示MQ服务器每次只会给消费者推送1条消息必须手动ack确认之后才会继续发送。channel.basicQos(1);

public class ...

RabbitMQ深度解析:从基础实践到高阶架构设计

最新发布

KE17RS的博客

06-02

991

RabbitMQ深度解析:从基础实践到高阶架构设计

RabbitMQ vs Kafka:零拷贝技术深度解析!

Leaton的博客

02-21

772

传统的 I/O 操作中,数据需要在内核空间和用户空间之间多次拷贝,而零拷贝技术通过减少甚至消除这些拷贝操作,显著提升了数据传输的效率。这篇文章将深入浅出地对比 RabbitMQ 和 Kafka 的零拷贝技术实现,帮助你理解它们的优劣,并结合代码示例让你轻松掌握如何优化消息队列的性能!例如,在使用内存交换机(Memory-based Exchanges)时,消息可以在 Broker 内部以指针的形式传递,避免了频繁的数据拷贝。虽然 RabbitMQ 在某些场景下支持零拷贝技术,但它并不是其核心设计的一部分。

架构模式深度探索:揭秘DDD与微服务架构的5种完美结合

[架构模式深度探索:揭秘DDD与微服务架构的5种完美结合](https://media.geeksforgeeks.org/wp-content/uploads/20240221122149/Domain-Driven-Design.webp) # 摘要 随着软件架构复杂性的增长,架构模式在软件开发中...

RabbitMQ-in-Depth:深度使用RabbitMQ的示例和材料

05-12

**RabbitMQ 深度探索** RabbitMQ 是一个高度可扩展且极其灵活的消息队列服务,基于 AMQP(Advanced Message Queuing Protocol)协议,广泛应用于分布式系统、微服务架构以及不同应用间的异步通信。它允许应用程序...

RabbitMQ的四种消息传递模式与演示代码

jakelihua

04-07

1916

例如,用户在网站上提交了一个长时间处理的任务(如生成报表、发送邮件等),为了提高用户体验,可以将任务提交到RabbitMQ的任务队列中,然后由后台的消费者进行异步处理。通过使用Fanout模式,可以将日志消息广播到所有相关的队列中,每个消费者只需要关注自己负责处理的日志级别,从而实现了日志的分发和处理。交换机会将收到的消息广播到与其绑定的所有队列中。Fanout模式是RabbitMQ中的一种消息传递模式,它将消息广播到所有绑定到Exchange的队列中,即使在消息发布之后才创建的队列,也能接收到消息。

RabbitMQ的5种消息队列

YIYIYI

08-16

4017

RabbitMQ的5种消息队列

MQ消息的几种模式

Here_的博客

08-17

1035

```csharp

private Material material;

public float moveSpeed = 0.5f;

void Start()

{

material = GetComponent().sharedMaterial;

}

void Update()

{

if (material)

{

float m_Offset .

RabbitMQ-03(实战 、RabbitMQ 的六种消息模式)

想去很远地方

08-10

1423

Header:header模式与routing不同的地方在于,header模式取消routingkey,使用header中的 key/value(键值对)匹配队列。主题交换背后的逻辑类似于直接交换 - 使用特定路由键发送的消息将被传递到使用匹配绑定键绑定的所有队列。此时再图形化界面查看交换机,发现并不是像队列一样可以获取到消息,猜测交换机收到的消息如果不能立即发送就会销毁。上一个模式中的交换机将所有消息广播给所有消费者。公平分发:根据消费者的消费能力进行公平分发,处理快的处理的多,处理慢的处理的少;...

RabbitMQ五种模式及使用场景

M_J_R的博客

05-06

4758

RabbitMQ基本模式

1.基本模型

2.RabbitMQ应用场景

MQ全称为Message Queue,即消息队列, RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。

开发中消息队列通常有如下应用场景:

1、任务异步处理。

将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。提高了应用程序的响应时间。

2、应用程序解耦合

RabbitMQ的五种消息模型

清风且从容,何需念君安。

01-13

3753

RabbitMQ提供了多种消息模型,官网上第6种是RPC不属于常规的消息队列。属于消息模型的是前5种:简单的一对一模型工作队列模型 ,一个生产者将消息分发给多个消费者发布/订阅模型 ,生产者发布消息,多个消费者同时收取路由模型 ,生产者通过关键字发送消息给特定消费者主题模型 ,路由模式基础上,在关键字里加入了通配符。

【RabbitMQ】万字整理RabbitMQ的五种消息模式

m0_66584716的博客

01-09

962

RabbitMQ的五种消息模式(含代码示例)

RabbitMQ之五种消息模型

qq_57907966的博客

02-11

2205

虚拟主机:类似于mysql中的database。他们都是以“/”开头。

C#消息通讯探索:MSMQ、ActiveMQ、RabbitMQ解析

消息发送有两种模式:快递方式(快速但不持久)和可恢复模式(慢速但能抵抗故障)。 MSMQ的主要优点是其异步通信模式,发送方无需等待接收方的响应即可继续执行,提高了系统的整体效率。同时,由于消息队列的容错性...

相关推荐

卡塔尔世界杯贺炜5大金句,足球解说的天花板
365bet哪个国家的

卡塔尔世界杯贺炜5大金句,足球解说的天花板

梦里花落知多少
365彩票下载1.0.0老版本

梦里花落知多少

世界杯-25分钟入三球 瑞典3-0墨西哥携手出线
日博365哪个是真的

世界杯-25分钟入三球 瑞典3-0墨西哥携手出线

血清及血浆制备实验
日博365哪个是真的

血清及血浆制备实验

在查找 App 中并未看见我的 AirPods Pro (第 2 代) 登录设备也并未看见
365彩票下载1.0.0老版本

在查找 App 中并未看见我的 AirPods Pro (第 2 代) 登录设备也并未看见

什么是工银e支付?
365彩票下载1.0.0老版本

什么是工银e支付?