Node&RabbitMQ系列四 X场景解构

学不可以已。(《荀子·劝学》)

目标

先设想购票的场景:用户注册成功后,三天内未登陆,则短信提醒

三段式场景: A-B-C
可以延伸至:

  1. 发起购票—>时间+未付款—>短信提醒⏰、取消订单
  2. 发起退款—>时间+未处理—>通知相关人员⏰
总的来说,满足A,到期时间后进一步处理。

思路

案例

该案例来自–RabbitMQ最佳实践

假设有个订单(Order)系统,用户下单后需要向用户发送短信通知,而所有对订单的数据显示采用了CQRS架构,即将订单的读模型和写模型分离,即所有订单的更新都通过事件发到rabbitmq,然后专门有个consumer接收这些消息用于更新订单的读模型

在次基础上,做了少许更改: 创建订单,5分钟后如果没有支付,则发短信通知。, 整个流程:

  1. 触发订单,统一放进 order.exchange 交换机,并且采用 rabbitmq delay 插件、采用topic规则;
  2. 一个订单消息,同时发到两个队列
    1. 处理订单: order.# 【后期update可以沿用该路由】
    2. 处理短信:order.create
  3. 发布消息
    1. 采用手工ack的方式,记录日志,进一步保证可靠性;
    2. 采用延迟插件
      1. order.#: 直接发送
      2. order.create: 延迟N分钟,
  4. 队列
    1. order.notification 绑定 order.create, 处理短信
    2. order.summary: 绑定order.#, 处理订单业务相关
  5. 死信队列
    1. 模拟,短信服务、订单业务异常,扔到死信队列,以供之后相关操作。

待完善

  1. 可靠投递;注:先把流程走通,实现了再完善;
  2. 重试投递;requeue问题
  3. 当然还有更多问题待发掘。

代码部分

1. rabbitmq 订单相关的交换机、队列等的名称

// 订单相关mq
export enum ORDER {
// 相关业务一个交换机
EXCHANE = 'order.exchange',
// 路由key
ROUTERKEY_CREATE = 'order.create',
ROUTERKEY_SUMMARY = 'order.#',
// 两个队列
QUEUE_NOTIFICATION = 'order.notification',
QUEUE_SUMMARY = 'order.summary',
// 死信
ELX_EXCHANGE = 'order.dlx.exchange',
ELK_QUEUE = 'order.dlq.queue'
}

2. 生产端

import * as rabbimq from "./rabbitmq";
import { ORDER, EX_TYPE } from './rbEnum';
import * as mock from 'mockjs';

export async function create() {
const connct = rabbimq.connection();
const channel = await connct.createConfirmChannel();
// @ts-ignore
// 统一业务,用一个交换机: 延迟+topic
await channel.assertExchange(ORDER.EXCHANE, EX_TYPE.DELAY, {
durable: true,
arguments: {'x-delayed-type': EX_TYPE.TOPIC }
});
// 模拟一些假的数据
let i = 0;
while(i < 3){
i++;
// N分钟后,对没支付的进行通知
const time = mock.Random.integer(500, 2000);
const expiration = 1000 * 60 * 60;
const content = { id: mock.Random.integer(1000), number: mock.Random.integer(1, 3), time };
const msg = JSON.stringify(content);

// !场景1:创建订单,如果三分钟不付款,则发短信通知
channel.publish(ORDER.EXCHANE, ORDER.ROUTERKEY_CREATE, Buffer.from(msg), {
expiration,
contentType: 'application/json',
headers: {
'x-delay': time, // 一定要设置,否则按一般的交换机
},
// 消息持久化
deliveryMode: 2
}, (err, ok) => {
// 生产端监听消息是否被ack;比如,记录日志啥的
// 如果消费端,nack, 则不会再次到这里
// console.log("是否被ack----ok: ", err, ok);
if (err !== null) {
console.warn('【SMS】Message nacked!');
} else {
console.log('【SMS】Message acked');
}
});
// !场景2, 发送后,供消费端消费
channel.publish(ORDER.EXCHANE, ORDER.QUEUE_SUMMARY, Buffer.from(msg), {}, (err, ok) => {
if (err !== null) {
console.warn('【summary】Message nacked!');
} else {
console.log('【summary】Message acked');
}
})

}
// 如果创建需要确认的channel,需要等待
// 生产者消息确认,一旦消息被投递到指定交换机,broker就会ack
await channel.waitForConfirms()
await channel.close();
};

3. 消费端

import * as rabbimq from "./rabbitmq";
import { Channel, Message } from "amqplib";
import { ORDER, EX_TYPE } from './rbEnum';
import * as mock from 'mockjs';


// 死信队列,针对无效信息的归属地
export async function orderDlq() {
const connect = rabbimq.connection();
const channel: Channel = await connect.createChannel();
await channel.assertExchange(ORDER.ELX_EXCHANGE, EX_TYPE.TOPIC, { durable: true })
const queueConsumer = await channel.assertQueue(ORDER.ELK_QUEUE, {
exclusive: true,
});
await channel.prefetch(1, false);
await channel.bindQueue(queueConsumer.queue, ORDER.ELX_EXCHANGE, 'order.#');

channel.consume(queueConsumer.queue, async msg => {
// console.info('【死信队列】收到的消息', msg.content.toString());
channel.ack(msg, false);
});
}

// 针对没有支付的,发送短信,
export async function orderSms() {
const args = {
exchange: ORDER.EXCHANE, exchangeType: EX_TYPE.DELAY,
routerKey: ORDER.ROUTERKEY_CREATE, queueName: ORDER.QUEUE_NOTIFICATION,
elx: ORDER.ELX_EXCHANGE, elk: ORDER.ELK_QUEUE
};
consumer(args, async (msg, channel) => {
const { content } = msg;
// console.log(`【短信】获取消息: ${content}`);
// 根据消息ID,查询对应用户是否支付,如果支付ack, 否则发送短信&扔到死信队列,之后再说
const success = mock.Random.boolean();
if (success) {
console.info(`【短信】无需发送短信 ${content}`, )
channel.ack(msg, false);
} else {
// !可以扔到短信队列
console.info(`【短信】该用户尚未支付,发送短信中----- ${content}`);
channel.nack(msg, false, false);
}
})
}

// 创建订单,针对订单相关信息进行处理
export async function orderSummery() {
const args = {
exchange: ORDER.EXCHANE, exchangeType: EX_TYPE.DELAY,
routerKey: ORDER.ROUTERKEY_SUMMARY, queueName: ORDER.QUEUE_SUMMARY,
elx: ORDER.ELX_EXCHANGE, elk: ORDER.ELK_QUEUE
};

consumer(args, async (msg, channel) => {
const { content, fields: { deliveryTag }, properties: { headers: { retry } } } = msg;
// console.log(`【订单-消费】获取消息: ${content}`);
// 模拟业务
const success = await mock.Random.boolean();
if (success) {
console.info('【订单-消费】消费成功', content.toString());
// broker 从内存磁盘中删除
channel.ack(msg, false);
} else {
// 仍旧保留
console.info(`【订单-消费】放入死信队列`);
channel.nack(msg, false, false);
// 最大重试次数 [加入redis 或 其他队列]
// if () {
// console.info(`【订单-消费】第 ${retry} 次消费 ${deliveryTag} 失败,尝试重试`);
// const requeue = true;
// channel.nack(msg, false, requeue);
// } else {
// console.info(`【订单-消费】第 ${retry} 次消费 ${deliveryTag} 失败,放入死信队列`);
// const requeue = false;
// channel.nack(msg, false, requeue);
// }
}
})
}


/**
* 公用消费端
* @param exchange
* @param exchangeType
* @param routerKey
* @param elx 死信交换机
* @param elk 死信队列
*/
async function consumer(args: { exchange: string, exchangeType: any, routerKey: string, queueName: string, elx: string, elk: string },
callback: (msg: Message, channel: Channel) => {}) {
const { exchange, exchangeType, routerKey, queueName, elx, elk } = args;
const connect = rabbimq.connection();
const channel: Channel = await connect.createConfirmChannel();

// ! topic + delay
await channel.assertExchange(exchange, exchangeType, { durable: true, arguments: { 'x-delayed-type': EX_TYPE.TOPIC } })

const queueConsumer = await channel.assertQueue(queueName, {
exclusive: true,
deadLetterExchange: elx,
deadLetterRoutingKey: elk,
});
await channel.prefetch(1, false);
await channel.bindQueue(queueConsumer.queue, exchange, routerKey);
channel.consume(queueConsumer.queue, async msg => {
// console.info('统一收到的消息', msg);
callback(msg, channel);
}, { noAck: false });
}

4. github

github—https://github.com/simuty/Node_Demo/tree/main/rabbitmq

参考链接
Node + MQ 限流小计
Java秒杀系统实战系列~秒杀逻辑优化之RabbitMQ接口限流二
【RabbitMQ】一文带你搞定RabbitMQ延迟队列
amqp wiki
RabbitMQ 高级篇八 消费端ACK与重回队列
RabbitMQ高级篇一 本章导航及BAT大厂如何保障生产端可靠性投递
RabbitMQ最佳实践
RabbitMQ retries on Node.JS
Rabbitmq