Node&RabbitMQ系列三 重连

学不可以已。(《荀子·劝学》)

前提

前两篇对rabbitmq的基本概念与延迟队列、死信队列进行了代码测试,默认都是理想情况的正常操作,针对复杂多变的网络环境,先不说投递的可靠性,首先服务的可用性就是第一个拦路虎,如:重连、限流。

本文目标:

  1. 单独抽离rabbitmq配置,便于之后写插件
  2. 考虑异常,比如:重联,<之前为了实现API,不想考虑>
  3. 消费端限流,为啥,因为遇到过bug…

有了前边三篇的基础,就直接上代码了

代码篇

重点


// !应该写断言的。。。下次
import * as assert from 'assert'
import * as amqp from 'amqplib'
import * as consumer from './consumer'


// 连接配置:https://www.squaremobius.net/amqp.node/channel_api.html#connect
// url | config
const config = {
protocol: 'amqp',
hostname: 'localhost',
port: 5672,
username: 'guest',
password: 'guest',
// 最大连接数,0:无限
// the size in bytes of the maximum frame allowed over the connection. 0 means no limit (but since frames have a size field which is an unsigned 32 bit integer, it’s perforce 2^32 - 1); I default it to 0x1000, i.e. 4kb, which is the allowed minimum, will fit many purposes, and not chug through Node.JS’s buffer pooling.
frameMax: 0,
// 心跳周期
heartbeat: 0,
}

let connect: amqp.Connection;
// 最大连接次数...
let maxConnectTimes = 0;
let isConnect = false;
export const init = async () => {
try {
connect = await amqp.connect(config);
// 监听error\close,重新连接
connect.on('error', err => {
reconnect(err, 'error');
});
// 什么时候会触发?网络异常、服务异常、管理后台删除
connect.on('close', err => {
reconnect(err, 'close');
});
console.info('[x]Rabbitmq connect success');
// !注册执行消费者
// 可以根据需求,多写几个?
consumer.run(connect);
return connect;
} catch (error) {
reconnect(error, 'catch');
}

}

const reconnect = (err, event) => {
// 因为后台删除连接,会同时触发error、close, 为了不一次创建两个,所以做个限制
if (!isConnect) {
isConnect = true;
maxConnectTimes++;
console.error(`[x]Lost connection to RMQ. reconnectingCount: ${maxConnectTimes}. Reconnecting in 10 seconds...`);
console.error('[x]Rabbitmq close: ', event, err);
// 5秒连接一次
return setTimeout(init, 1000 * 5);
}
}

// 公用这个连接
export const connection = () => {
return connect;
}

启动文件

import * as http from 'http'
import * as rabbitmq from './rabbitmq';
import * as producer from './producer';

/**
* 实现功能
* 1. 启动node服务 && 初始化rabbitmq <包含:重联、启动消费端>
* 2. 接口请求:http://127.0.0.1:3000/producer; 触发【生产者生产信息】
* 3. 【消费端】
* 1. 监听队列进行消费
* 2. 限流,其实设置一下参数就行了
* 3. 待完成:根据业务,扔到死信队列 ?
*/

http.createServer((req, res) => {
if (req.url === '/producer') {
producer.publish();
}
res.end('hello world')
}).listen(3000, () => {
rabbitmq.init();
console.log('开启端口3000')
})

消费端

async function consumer(args: {exchange, queue, routingKey, connection}, cb: (msg: any, channel: any) => void){
// 常规操作
const channel = await args.connection.createChannel();
await channel.assertExchange(args.exchange, 'direct', {durable: false});
const queueA = await channel.assertQueue(args.queue, {exclusive: false});
await channel.bindQueue(queueA.queue, args.exchange, args.routingKey);
// !消费端限流
await channel.prefetch(1, false);
// 消费队列
await channel.consume(queueA.queue, msg => {
cb(msg, channel);
});
}


export const run = (connection) => {
consumer({
exchange: 'order.exchange',
routingKey: 'order.routingKey',
queue: 'order.queue',
connection,
}, async (msg, channel) => {
const data = msg.content.toString();
console.info(`${(new Date()).getMinutes()}:${(new Date()).getSeconds()} consumer msg:%j`, data);
console.log('msg: ', msg)
return setTimeout(function () {
try {
/**
* 针对队列信息进行业务操作
* 1. 直接消费
* 2. 重回队列
* 3. 扔到死信队列
*/
channel.ack(msg);
// if(Number(data) < 6) {
// // 手动ack
// channel.ack(msg);
// } else {
// // !1. 重回队列
// channel.nack(msg);
// // !2. 扔到死信队列
// // 下个demo再整理。
// }
} catch (err) {
console.error('消息 Ack Error:', err)
}
// 每隔1s执行一个任务
}, 1000);
})
}

github

# 启动服务,node应该都会,要不也不会看这个
$ ts-node index.ts

代码地址:https://github.com/simuty/Node_Demo

参考链接
How to reestablish connection after a failure? #153
一次 RabbitMQ 生产故障引发的服务重连限流思考
Node + MQ 限流小计
Java秒杀系统实战系列~秒杀逻辑优化之RabbitMQ接口限流二
【RabbitMQ】一文带你搞定RabbitMQ延迟队列
amqp wiki