上篇文章主要以生产者角度:确保消息发出去了
,这篇文章主要以消费者的角度:确保处理了对应的消息
处理包含几层含义
- 成功
ack
; - 失败
nack
-requeue:true
orfalse
; - 扔到
死信队列
试想以下场景
消费者收到消息,业务执行
异常
,重试N次
,如果N次内成功–[ack]; 否则丢弃
或扔到死信队列
;
- Use a quorum queue (requires RabbitMQ 3.8.0 or higher) and specify a redelivery limit
- Count message redeliveries in an external store (you need a unique way to identify the message if you do this)
- Instead of nacking the message, re-publish it with a new header which counts the number of redeliveries, then acknowledge the original. This can lead to duplicate messages if you application crashes between the publish and the acknowledgment however
- quorum-queue: 设置delivery-limit;
- 重新publish;
- 生成header头retrynumbuer;
- 发布到原来的交换机
- 丢弃消息
- 比如采用redis记录重试次数,
Quonum Queue
Quorum Queues 也是RabbitMQ的队列类型,自RabbitMQ 3.8.0起可用。默认生成的queue type 是 Classic;
- 宗旨为: 将数据安全放在首位
- Classic VS Quorum
Feature | Classic Mirrored | Quorum |
---|---|---|
Non-durable queues <可以非持久> | yes | no |
Exclusivity | yes | no |
Per message persistence | per message | always |
Membership changes | automatic | manual |
TTL | yes | no |
Queue length limits | yes | partial (drop-head strategy only) |
Lazy behaviour | yes | partial (see Memory Limit) |
Message priority<优先级> | yes | no |
Consumer priority<优先级> | yes | no |
Dead letter exchanges | yes | yes |
Adheres to policies<策略> | yes | partial (dlx, queue length limits) |
Reacts to memory alarms | yes | partial (truncates log) |
Poison message handling | no | yes |
Global QoS Prefetch | yes | no |
Poison message handling: 翻译成异常消息处理,估计没啥问题。
不适用场景
临时队列;
低延迟;
数据安全性没那么高的
非常长的队列—因为quonum一直将数据存储在内存,如可以最大的消息数量、限制消息体的大小。
适用场景
对数据安全性要求高的, 例如销售系统中的接收订单或选举系统中的投票,其中可能丢失的消息将对系统的正确性和功能产生重大影响。
发布者确认收到,消费者手动ack, 配合quonum保证数据安全性。
代码实现
// send.ts |
import * as amqp from 'amqplib' |
➜ 6Consumer git:(main) ✗ ts-node receive.ts |