MQ

  1. 消息队列,FIFO先进先出
  2. 可流量消峰、应用解耦,异步处理

RabbitMQ核心部分

简单模式、工作模式、发布订阅模式、路由模式、主题模式、发布确认模式

  1. 一个Broker实体多个Exchange交换机多个Queue队列
  2. 一个连接(TCP)多个信道(连接中的逻辑连接)

image-20230613135334246

docker下载

1
2
3
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=root -e RABBITMQ_DEFAULT_PASS=123456 rabbitmq:3.8.8-management

docker run -d --name my_rabbitmq -p 5672:5672 -p 15672:15672 -v `pwd`/data:/var/lib/rabbitmq --hostname myRabbit -e RABBITMQ_DEFAULT_VHOST=my_vhost -e RABBITMQ_DEFAULT_USER=< 你的用户名> -e RABBITMQ_DEFAULT_PASS=<你的密码> a64a4ae7bc1f
1
2
3
4
5
6
7
8
9
10
11
12
<!--rabbitmq 依赖客户端-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>
<!--操作文件流的一个依赖-->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>

发送第一个消息

生产者—>队列—>消费者

生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class Producer {
private static final String QUEUE_NAME = "HELLO";

public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("123456");

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
/**
* 参数介绍
* 1、队列名称
* 2、队列里面的消息是否持久化(默认在内存中)
* 3、该队列是否只能一个消费者进行消费,是否进行共享,true为共享消费
* 4、是否自动删除队列,最后一个消费者断开连接后
* 5、其他参数
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/**
* 参数介绍
* 1、发送到那个交换机
* 2、路由的Key值是那个,本次队列名称
* 3、其他参数
* 4、消息
*/
channel.basicPublish("", QUEUE_NAME, null, "HELLO WORLD".getBytes());
}
}
消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class Consumer {
private static final String QUEUE_NAME = "HELLO";

public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("123456");

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

DeliverCallback failcallback = (tag, message) -> System.out.println(message);
CancelCallback cancelCallback = System.out::println;

/**
* 参数介绍
* 1、队列名
* 2、消费成功是否自动应答
* 3、成功消费的回调
* 4、取消消费的回调
*/
channel.basicConsume(QUEUE_NAME,true, failcallback, cancelCallback);

}
}

spring中提供了一个模板类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private static final String QUEUE = "queue";

@Resource
RabbitTemplate rabbitTemplate;

@Test
void sendMessage() {
rabbitTemplate.convertAndSend(QUEUE, "hello");
}

@Test
void receiveMessage() {
Message receive = rabbitTemplate.receive(QUEUE);
System.out.println(receive.getBody());
}

配置信息

1
2
3
4
5
6
7
8
9
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: root
password: 123456
listener:
simple:
prefetch: 1 # 预获取1条消息,防止多个消费者对一个队列进行竞争,导致消息堆积(每次只能获取1条消息,处理完才能获取下一条消息)

Fanout交换机

广播(Fanout)

交换机将接收到的消息分配给每一个队列进行存储,后有消费者消费

image-20231111134730357

1
2
3
4
5
6
7
8
9
10
11
12
13
// 消费消息
@Component
public class ListenerMQ {
@RabbitListener(queues = "queue")
public void receive(String msg) {
System.out.println("queue receive msg:" + msg);
}

@RabbitListener(queues = "queue1")
public void receive1(String msg) {
System.out.println("queue1 receive msg:" + msg);
}
}
1
2
3
4
5
6
7
8
9
10
// 发送消息
private static final String EXCHANGE = "amq.fanout";

@Resource
RabbitTemplate rabbitTemplate;

@Test
void sendMessage() {
rabbitTemplate.convertAndSend(EXCHANGE, null, "hello");
}
绑定队列

image-20231111140639727

image-20231111140653460

定向(Direct)

交换机对固定的队列进行发消息,根据routingkey来区别

image-20231111141143402

1
2
3
4
5
6
7
8
9
10
11
// 发送方,需要执行routingkey
private static final String EXCHANGE = "div.fanout";

@Resource
RabbitTemplate rabbitTemplate;

@Test
void sendMessage() {
rabbitTemplate.convertAndSend(EXCHANGE, "red", "hello"); // red在绑定队列的时候就设置了
}

1
2
3
4
5
// 接收方
@RabbitListener(queues = "queue")
public void receive(String msg) {
System.out.println("queue receive msg:" + msg);
}

话题(Topic)

可选定多个队列进行发送

多个用.分割,#代表0个或者多个单词,例如: chain.#

*代表一个单词

image-20231111142424152

image-20231111144601398

1
2
3
4
5
6
7
8
9
10
//发送方
private static final String EXCHANGE = "div.topic";

@Resource
RabbitTemplate rabbitTemplate;

@Test
void sendMessage() {
rabbitTemplate.convertAndSend(EXCHANGE, "queue.all", "hello"); // routingkey为null的话,#能匹配到
}
1
// 接收方与上方的一样

声明队列交换机

第一种使用配置交换机、配置队列、配置绑定关系,但是这种配置会比较繁琐, 产生比较多的代码

第二种在消费者端使用注解@RabbitListener配置 (推荐)

1
2
3
4
5
6
7
8
9
// 第二种使用
@RabbitListener(bindings = @QueueBinding(
value = @Queue("queue"),
exchange = @Exchange(name = "div.exchange", type = "topic", durable = "true"),
key = {"#"}
))
public void receive(String msg) {
System.out.println("queue receive msg:" + msg);
}

消息转换器

实现 Serializable 并且注入序列化类,或者使用Message来手动获取body再手动反序列化。

两种都需要序列化,一种配置自动的另一种需要手动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
...
messageProperties.setContentType("text/plain");
messageProperties.setContentEncoding(this.defaultCharset);
} else if (object instanceof Serializable) {
try {
bytes = SerializationUtils.serialize(object); // 序列化
} catch (IllegalArgumentException var5) {
throw new MessageConversionException("failed to convert to serialized Message content", var5);
...

public static byte[] serialize(Object object) {
if (object == null) {
return null;
} else {
ByteArrayOutputStream stream = new ByteArrayOutputStream();

try {
(new ObjectOutputStream(stream)).writeObject(object); // ObjectOutputStream序列化,说以我们需要注入一个用于序列化的实现,例如jackson
} catch (IOException var3) {
throw new IllegalArgumentException("Could not serialize object of type: " + object.getClass(), var3);
}

return stream.toByteArray();
}
}

生产者可靠性

生产者重连

生产者连接失败后的重试设置,但是会导致当前线程堵塞住,如果需要非堵塞的话。需要将重试关闭或者使用异步编程之类的来将消息发送异步处理。

1
2
3
4
5
6
7
connection-timeout: 1s # 设置MQ的连接超时时间
template:
retry:
initial-interval: 1000ms # 失败后重新初始化时间
max-attempts: 3 # 最大重试次数
multiplier: 1 # 重试间隔时间的倍数
enabled: true # 超时重试开启

生产者确认

消息发送时失败处理,

返回值状态
  1. 消息投递到了MQ,但是路由失败,返回ACK成功
  2. 消息投递到了MQ,并且入队成功,返回ACK成功
  3. 消息投递MQ,并且消息持久化成功, 返回ACK成功
  4. 其他都会返回NACK失败

总结:只要消息到达MQ中,都会返回ACK

1
2
publisher-confirm-type: correlated # 开启消息确认机制,并使用消息的correlationId进行匹配
publisher-returns: true # 开启消息失败后重新发送机制(多半路由配置不正确问题导致的)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 重写 publisher-returns(returnCallBack)
@Configuration
public class CustomReturnCallback implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
// MQ失败返回的一些信息
System.out.println(returnedMessage);
}
});
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// publisher-confirm-type (confirmCallBack)
CorrelationData correlationData = new CorrelationData(IdUtil.randomUUID());
correlationData.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
@Override
public void onFailure(Throwable ex) {
// future发生异常时会触发
}

@Override
public void onSuccess(CorrelationData.Confirm result) {
if (result.isAck()) {
// 发送成功
} else {
// 发送失败,尝试重试发送,也可以加入重试次数。如果重试都失败了,可以警告报警之类的
}
}
});
rabbitTemplate.convertAndSend(EXCHANGE, "queue.all", new User("jason", 12));
ThreadUtil.sleep(2, TimeUnit.SECONDS);

数据持久化

消息持久化

会将消息持久化到硬盘中,并且重启MQ并不会丢失消息

1
2
3
4
5
6
@Test
void sendMessage() {
MessageBuilder messageBuilder = MessageBuilder.withBody("123".getBytes());
messageBuilder.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
rabbitTemplate.convertAndSend(EXCHANGE, "queue.all", messageBuilder);
}

Lazy Queue

RabbitMQ 3.6开始,增加了懒惰队列。

  • 当消息MQ接收到消息后直接存储硬盘中,后续使用会从硬盘中读取(默认2048条)
  • 支持数百万条消息,防止消息丢失
  • 3.12后默认强制开启懒惰队列,无法更改
1
2
3
4
5
6
7
8
9
10
// 方式一
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "queue", durable = "true"),
exchange = @Exchange(name = "div.exchange", type = "topic"),
arguments = @Argument(name = "x-queue-mode", value = "lazy"),
key = {"#"}
))
public void receive(User msg) {
System.out.println("queue receive msg:" + msg);
}
1
2
3
4
5
// 方式二
@Bean
public Queue queueBuilder() {
return QueueBuilder.durable("queue").lazy().build();
}

消费者可靠性

消费者消费消息后,应该给MQ给一个回执

  • ack:成功处理消息,MQ删除消息
  • nack:失败处理消息,MQ需要再次投递
  • reject:消费者拒接此消息,MQ删除此消息
1
2
3
4
5
6
// 配置
simple:
acknowledge-mode: auto
// none 默认,什么都不做
// auto 根据异常返回MQ: ack、nack、reject
// manual 手动来提交

消息失败处理

如果消费者失败将消息丢回MQ,然后MQ又重新投递给消费者,然后消费者又消费失败,开始循环。则需要有MessageRecover接口来处理,它包含三种不同的实现

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,放回MQ
  • RepublishMessageRecoverer:重试耗尽后,将消息放到其他交换机中
1
2
3
4
5
6
7
8
9
10
11
// 消费者重试
listener:
simple:
prefetch: 1 # 预获取1条消息,防止多个消费者对一个队列进行竞争,导致消息堆积(每次只能获取1条消息,处理完才能获取下一条消息)
acknowledge-mode: auto
retry:
enabled: true # 开启消息重试机制
max-attempts: 3 # 最大重试次数
initial-interval: 1s # 失败后重新初始化时间
multiplier: 1 # 重试间隔时间的倍数
stateless: true # 开启无状态模式,不保存重试记录

业务的幂等性

  1. 利用为每个消息设置一个唯一id, 需要再json序列化对象中设置。当消费者消费后将id保存到数据库中,然后如果有相同消息查询数据库判定是否有此id,有就直接放弃掉者消息处理(业务侵入,对性能也有影响)
  2. 业务幂等,通过改变业务来保障,例如:如果是更新某条记录,记录中有状态这个值,每次更新都会更新这个值。我们修改前可以先查询再修改。或者在sql上进行操作加条件and status = ?

延迟消息

1、死信交换机

不需要的消息,过期的消息(达到了队列中消息过期时间)、消息队列满了,最早的消息成为死信

死信交换机,需要给队列设置属性:dead-letter-exchange = 某个交换机

2、延迟消息插件

插件地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

1
2
3
4
5
6
7
8
9
# 复制到容器下 c94f1e7dbfc0容器id
docker cp rabbitmq_delayed_message_exchange-3.9.0.ez c94f1e7dbfc0:/opt/rabbitmq/plugins
# 进入容器
docker exec -it c94f1e7dbfc0 /bin/bash
# 安装
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
# 安装消息管理插件
rabbitmq-plugins enable rabbitmq_shovel rabbitmq_shovel_management
# 进入MQ网页端查看是否安装上插件