Node&RabbitMQ系列六 保证消费

夫大人者,与天地合其德,与日月合其明,与四时合其序,与鬼神合其吉凶。(《周易·䷀乾·文言》)

上篇文章主要以生产者角度:确保消息发出去了,这篇文章主要以消费者的角度:确保处理了对应的消息

处理包含几层含义

  1. 成功ack;
  2. 失败nack-requeue: true or false;
  3. 扔到死信队列

试想以下场景

消费者收到消息,业务执行异常, 重试N次,如果N次内成功–[ack]; 否则丢弃或扔到死信队列;

github上问了该问题,博主的回答

  1. Use a quorum queue (requires RabbitMQ 3.8.0 or higher) and specify a redelivery limit
  2. Count message redeliveries in an external store (you need a unique way to identify the message if you do this)
  3. Instead of nacking the message, re-publish it with a new header which counts the number of redeliveries, then acknowledge the original. This can lead to duplicate messages if you application crashes between the publish and the acknowledgment however

  1. quorum-queue: 设置delivery-limit;
  2. 重新publish;
    1. 生成header头retrynumbuer;
    2. 发布到原来的交换机
    3. 丢弃消息
  3. 比如采用redis记录重试次数,

Quonum Queue

RabbitMQ–Quorum Queues

Quorum Queues 也是RabbitMQ的队列类型,自RabbitMQ 3.8.0起可用。默认生成的queue type 是 Classic;

  • 宗旨为: 将数据安全放在首位
  • Classic VS Quorum
Feature Classic Mirrored Quorum
Non-durable queues <可以非持久> yes no
Exclusivity yes no
Per message persistence per message always
Membership changes automatic manual
TTL yes no
Queue length limits yes partial (drop-head strategy only)
Lazy behaviour yes partial (see Memory Limit)
Message priority<优先级> yes no
Consumer priority<优先级> yes no
Dead letter exchanges yes yes
Adheres to policies<策略> yes partial (dlx, queue length limits)
Reacts to memory alarms yes partial (truncates log)
Poison message handling no yes
Global QoS Prefetch yes no

Poison message handling: 翻译成异常消息处理,估计没啥问题。

不适用场景

  1. 临时队列;

  2. 低延迟;

  3. 数据安全性没那么高的

  4. 非常长的队列—因为quonum一直将数据存储在内存,如可以最大的消息数量、限制消息体的大小。

    适用场景

  5. 对数据安全性要求高的, 例如销售系统中的接收订单或选举系统中的投票,其中可能丢失的消息将对系统的正确性和功能产生重大影响。

  6. 发布者确认收到,消费者手动ack, 配合quonum保证数据安全性。

代码实现

// send.ts
import * as amqp from 'amqplib'

(async function publish() {
const url = `amqp://localhost:5672`;
// 默认交换机-队列-路由
const exchange = '6.delivery.limit.exchange';
const queueExName = '6.delivery.limit.queue';
const routingKey = '6.delivery.limit.routerkey';


const connect = await amqp.connect(url);
const channel = await connect.createChannel();
// 默认交换机
await channel.assertExchange(exchange, 'direct', { durable: false });
// 发布消息
channel.publish(exchange, routingKey, Buffer.from('hello world'), {
expiration: 3000
});
await sleep(1);
await connect.close();
process.exit(0);
})();

function sleep(time: number) {
return new Promise((resolve) => setTimeout(resolve, time*1000));
}
import * as amqp from 'amqplib'

/**
* 消费者 确认消息
*
* 1. 消费者重试3次,《采用quonum queue + delivery-limit: 3》
* 2.
*/

(async function consumer() {

// 交换机-路由
const url = `amqp://localhost:5672`;
// 死信交换机-路由
const deadLetterExchange = '6.dlx.exchange';
const deadLetterRoutingKey = '6.dlx.routingkey'
const deadLetterQueue = '6.dlx.queue'
// 默认交换机-队列-路由
const exchange = '6.delivery.limit.exchange';
const queueExName = '6.delivery.limit.queue';
const routingKey = '6.delivery.limit.routerkey';


const connect = await amqp.connect(url);
const channel = await connect.createChannel();
await channel.assertExchange(exchange, 'direct', { durable: false });

// 死信队列相关,为了web ui 展示数据
await channel.assertExchange(deadLetterExchange, 'direct', { durable: false });
const queueDL = await channel.assertQueue(deadLetterQueue)
await channel.bindQueue(queueDL.queue, deadLetterExchange, deadLetterRoutingKey);


const queueEX = await channel.assertQueue(queueExName, {
exclusive: false,
deadLetterExchange,
deadLetterRoutingKey,
arguments: {
'x-queue-type': 'quorum',
'x-delivery-limit': 3
}
});
await channel.bindQueue(queueEX.queue, exchange, routingKey);
await channel.consume(queueEX.queue, msg => {
console.log("消费队列", msg);
channel.nack(msg, false, true)
});
})();
➜  6Consumer git:(main) ✗ ts-node receive.ts
消费队列 { fields:
{ consumerTag: 'amq.ctag-EKH-xj09f47rsz3EqX8e7Q',
deliveryTag: 1,
redelivered: false,
exchange: '6.delivery.limit.exchange',
routingKey: '6.delivery.limit.routerkey' },
properties:
{ contentType: undefined,
contentEncoding: undefined,
# 这里
headers: {},
deliveryMode: undefined,
priority: undefined,
correlationId: undefined,
replyTo: undefined,
expiration: '3000',
messageId: undefined,
timestamp: undefined,
type: undefined,
userId: undefined,
appId: undefined,
clusterId: undefined },
content: <Buffer 68 65 6c 6c 6f 20 77 6f 72 6c 64> }
消费队列 { fields:
{ consumerTag: 'amq.ctag-EKH-xj09f47rsz3EqX8e7Q',
# 看这里
deliveryTag: 2,
redelivered: true,
exchange: '6.delivery.limit.exchange',
routingKey: '6.delivery.limit.routerkey' },
properties:
{ contentType: undefined,
contentEncoding: undefined,
# 这里
headers: { 'x-delivery-count': 1 },
deliveryMode: undefined,
priority: undefined,
correlationId: undefined,
replyTo: undefined,
expiration: '3000',
messageId: undefined,
timestamp: undefined,
type: undefined,
userId: undefined,
appId: undefined,
clusterId: undefined },
content: <Buffer 68 65 6c 6c 6f 20 77 6f 72 6c 64> }
消费队列 { fields:
{ consumerTag: 'amq.ctag-EKH-xj09f47rsz3EqX8e7Q',
deliveryTag: 3,
redelivered: true,
exchange: '6.delivery.limit.exchange',
routingKey: '6.delivery.limit.routerkey' },
properties:
{ contentType: undefined,
contentEncoding: undefined,
headers: { 'x-delivery-count': 2 },
deliveryMode: undefined,
priority: undefined,
correlationId: undefined,
replyTo: undefined,
expiration: '3000',
messageId: undefined,
timestamp: undefined,
type: undefined,
userId: undefined,
appId: undefined,
clusterId: undefined },
content: <Buffer 68 65 6c 6c 6f 20 77 6f 72 6c 64> }

RePublish