MQ
- 消息队列,FIFO先进先出
- 可流量消峰、应用解耦,异步处理
RabbitMQ核心部分
简单模式、工作模式、发布订阅模式、路由模式、主题模式、发布确认模式
- 一个Broker实体多个Exchange交换机多个Queue队列
- 一个连接(TCP)多个信道(连接中的逻辑连接)

docker下载
1 2 3
| docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=root -e RABBITMQ_DEFAULT_PASS=123456 rabbitmq:3.8.8-management
docker run -d --name my_rabbitmq -p 5672:5672 -p 15672:15672 -v `pwd`/data:/var/lib/rabbitmq --hostname myRabbit -e RABBITMQ_DEFAULT_VHOST=my_vhost -e RABBITMQ_DEFAULT_USER=< 你的用户名> -e RABBITMQ_DEFAULT_PASS=<你的密码> a64a4ae7bc1f
|
1 2 3 4 5 6 7 8 9 10 11 12
| <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.8.0</version> </dependency>
<dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.6</version> </dependency>
|
发送第一个消息
生产者—>队列—>消费者
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| public class Producer { private static final String QUEUE_NAME = "HELLO";
public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setUsername("root"); factory.setPassword("123456");
Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicPublish("", QUEUE_NAME, null, "HELLO WORLD".getBytes()); } }
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| public class Consumer { private static final String QUEUE_NAME = "HELLO";
public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setUsername("root"); factory.setPassword("123456");
Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
DeliverCallback failcallback = (tag, message) -> System.out.println(message); CancelCallback cancelCallback = System.out::println;
channel.basicConsume(QUEUE_NAME,true, failcallback, cancelCallback);
} }
|
spring中提供了一个模板类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| private static final String QUEUE = "queue";
@Resource RabbitTemplate rabbitTemplate;
@Test void sendMessage() { rabbitTemplate.convertAndSend(QUEUE, "hello"); }
@Test void receiveMessage() { Message receive = rabbitTemplate.receive(QUEUE); System.out.println(receive.getBody()); }
|
配置信息
1 2 3 4 5 6 7 8 9
| spring: rabbitmq: host: 127.0.0.1 port: 5672 username: root password: 123456 listener: simple: prefetch: 1 # 预获取1条消息,防止多个消费者对一个队列进行竞争,导致消息堆积(每次只能获取1条消息,处理完才能获取下一条消息)
|
Fanout交换机
广播(Fanout)
交换机将接收到的消息分配给每一个队列进行存储,后有消费者消费

1 2 3 4 5 6 7 8 9 10 11 12 13
| @Component public class ListenerMQ { @RabbitListener(queues = "queue") public void receive(String msg) { System.out.println("queue receive msg:" + msg); }
@RabbitListener(queues = "queue1") public void receive1(String msg) { System.out.println("queue1 receive msg:" + msg); } }
|
1 2 3 4 5 6 7 8 9 10
| private static final String EXCHANGE = "amq.fanout";
@Resource RabbitTemplate rabbitTemplate;
@Test void sendMessage() { rabbitTemplate.convertAndSend(EXCHANGE, null, "hello"); }
|
绑定队列


定向(Direct)
交换机对固定的队列进行发消息,根据routingkey来区别

1 2 3 4 5 6 7 8 9 10 11
| private static final String EXCHANGE = "div.fanout";
@Resource RabbitTemplate rabbitTemplate;
@Test void sendMessage() { rabbitTemplate.convertAndSend(EXCHANGE, "red", "hello"); }
|
1 2 3 4 5
| @RabbitListener(queues = "queue") public void receive(String msg) { System.out.println("queue receive msg:" + msg); }
|
话题(Topic)
可选定多个队列进行发送
多个用.分割,#代表0个或者多个单词,例如: chain.#
*代表一个单词


1 2 3 4 5 6 7 8 9 10
| private static final String EXCHANGE = "div.topic";
@Resource RabbitTemplate rabbitTemplate;
@Test void sendMessage() { rabbitTemplate.convertAndSend(EXCHANGE, "queue.all", "hello"); }
|
声明队列交换机
第一种使用配置交换机、配置队列、配置绑定关系,但是这种配置会比较繁琐, 产生比较多的代码
第二种在消费者端使用注解@RabbitListener配置 (推荐)
1 2 3 4 5 6 7 8 9
| @RabbitListener(bindings = @QueueBinding( value = @Queue("queue"), exchange = @Exchange(name = "div.exchange", type = "topic", durable = "true"), key = {"#"} )) public void receive(String msg) { System.out.println("queue receive msg:" + msg); }
|
消息转换器
实现 Serializable 并且注入序列化类,或者使用Message来手动获取body再手动反序列化。
两种都需要序列化,一种配置自动的另一种需要手动
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| ... messageProperties.setContentType("text/plain"); messageProperties.setContentEncoding(this.defaultCharset); } else if (object instanceof Serializable) { try { bytes = SerializationUtils.serialize(object); } catch (IllegalArgumentException var5) { throw new MessageConversionException("failed to convert to serialized Message content", var5); ... public static byte[] serialize(Object object) { if (object == null) { return null; } else { ByteArrayOutputStream stream = new ByteArrayOutputStream();
try { (new ObjectOutputStream(stream)).writeObject(object); } catch (IOException var3) { throw new IllegalArgumentException("Could not serialize object of type: " + object.getClass(), var3); }
return stream.toByteArray(); } }
|
生产者可靠性
生产者重连
生产者连接失败后的重试设置,但是会导致当前线程堵塞住,如果需要非堵塞的话。需要将重试关闭或者使用异步编程之类的来将消息发送异步处理。
1 2 3 4 5 6 7
| connection-timeout: 1s template: retry: initial-interval: 1000ms max-attempts: 3 multiplier: 1 enabled: true
|
生产者确认
消息发送时失败处理,
返回值状态
- 消息投递到了MQ,但是路由失败,返回ACK成功
- 消息投递到了MQ,并且入队成功,返回ACK成功
- 消息投递MQ,并且消息持久化成功, 返回ACK成功
- 其他都会返回NACK失败
总结:只要消息到达MQ中,都会返回ACK
1 2
| publisher-confirm-type: correlated publisher-returns: true
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @Configuration public class CustomReturnCallback implements ApplicationContextAware { @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class); rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() { @Override public void returnedMessage(ReturnedMessage returnedMessage) { System.out.println(returnedMessage); } }); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| CorrelationData correlationData = new CorrelationData(IdUtil.randomUUID()); correlationData.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() { @Override public void onFailure(Throwable ex) { }
@Override public void onSuccess(CorrelationData.Confirm result) { if (result.isAck()) { } else { } } }); rabbitTemplate.convertAndSend(EXCHANGE, "queue.all", new User("jason", 12)); ThreadUtil.sleep(2, TimeUnit.SECONDS);
|
数据持久化
消息持久化
会将消息持久化到硬盘中,并且重启MQ并不会丢失消息
1 2 3 4 5 6
| @Test void sendMessage() { MessageBuilder messageBuilder = MessageBuilder.withBody("123".getBytes()); messageBuilder.setDeliveryMode(MessageDeliveryMode.PERSISTENT); rabbitTemplate.convertAndSend(EXCHANGE, "queue.all", messageBuilder); }
|
Lazy Queue
RabbitMQ 3.6开始,增加了懒惰队列。
- 当消息MQ接收到消息后直接存储硬盘中,后续使用会从硬盘中读取(默认2048条)
- 支持数百万条消息,防止消息丢失
- 3.12后默认强制开启懒惰队列,无法更改
1 2 3 4 5 6 7 8 9 10
| @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "queue", durable = "true"), exchange = @Exchange(name = "div.exchange", type = "topic"), arguments = @Argument(name = "x-queue-mode", value = "lazy"), key = {"#"} )) public void receive(User msg) { System.out.println("queue receive msg:" + msg); }
|
1 2 3 4 5
| @Bean public Queue queueBuilder() { return QueueBuilder.durable("queue").lazy().build(); }
|
消费者可靠性
消费者消费消息后,应该给MQ给一个回执
- ack:成功处理消息,MQ删除消息
- nack:失败处理消息,MQ需要再次投递
- reject:消费者拒接此消息,MQ删除此消息
1 2 3 4 5 6
| // 配置 simple: acknowledge-mode: auto // none 默认,什么都不做 // auto 根据异常返回MQ: ack、nack、reject // manual 手动来提交
|
消息失败处理
如果消费者失败将消息丢回MQ,然后MQ又重新投递给消费者,然后消费者又消费失败,开始循环。则需要有MessageRecover接口来处理,它包含三种不同的实现
- RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认
- ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,放回MQ
- RepublishMessageRecoverer:重试耗尽后,将消息放到其他交换机中
1 2 3 4 5 6 7 8 9 10 11
| // 消费者重试 listener: simple: prefetch: 1 acknowledge-mode: auto retry: enabled: true max-attempts: 3 initial-interval: 1s multiplier: 1 stateless: true
|
业务的幂等性
- 利用为每个消息设置一个唯一id, 需要再json序列化对象中设置。当消费者消费后将id保存到数据库中,然后如果有相同消息查询数据库判定是否有此id,有就直接放弃掉者消息处理(业务侵入,对性能也有影响)
- 业务幂等,通过改变业务来保障,例如:如果是更新某条记录,记录中有状态这个值,每次更新都会更新这个值。我们修改前可以先查询再修改。或者在sql上进行操作加条件and status = ?
延迟消息
1、死信交换机
不需要的消息,过期的消息(达到了队列中消息过期时间)、消息队列满了,最早的消息成为死信
死信交换机,需要给队列设置属性:dead-letter-exchange = 某个交换机
2、延迟消息插件
插件地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
1 2 3 4 5 6 7 8 9
| docker cp rabbitmq_delayed_message_exchange-3.9.0.ez c94f1e7dbfc0:/opt/rabbitmq/plugins
docker exec -it c94f1e7dbfc0 /bin/bash
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
rabbitmq-plugins enable rabbitmq_shovel rabbitmq_shovel_management
|