最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

Nest.js ClientProxy 重新连接

网站源码admin26浏览0评论

Nest.js ClientProxy 重新连接

Nest.js ClientProxy 重新连接

我正在使用 Nest.js 的微服务,但我遇到了一个问题。

我的需求是向 RabbitMQ 发送消息并将消息日志保存到 mongo,如果 ClientProxy 已连接,mongo 中的 sendMessageStatus 为“ok”,如果 ClientProxy 未连接,mongo 中的 sendMessageStatus 为“fail”。

当我测试我的应用程序时,我关闭了 RabbitMQ 服务器并发送消息,mongo 可以使用 sendMessageStatus 获取此日志:“fail”,之后我重新启动 RabbitMQ 服务器并再次执行,mongo 应该获取 sendMessageStatus:“ok”,但是它不会,仍然“失败”。

这意味着即使 RabbitMQ 服务器重新启动,ClientProxy 也不会重新连接,我注意到 Nest.js 官方文档说“ClientProxy 是懒惰的。”,那么我该如何重新连接 ClientProxy?

jobQueue.module.ts

return ClientProxyFactory.create({
            transport: Transport.RMQ,
            options: {
              urls: [`amqp://${account}:${password}@${IP}:${port}`],
              queue: outputQueueName,
              serializer: {
                serialize: value => value.data,
              },
              noAck: false,
              persistent: true,
              queueOptions: {
                durable: true,
              }
            }
          });

jobQueue.service.ts

constructor(
        @Inject(CONNECTION_NAME)
        private readonly client: ClientRMQ,
    ) {
    };
async sendMessage(data: SendMessageDto) {
        try {
            this.logger.serviceDebug(SENDMESSAGE_METHOD);
            data.id = this.messageID++;
            return await this.client.connect()
                .then(() => {
                    return this.client.emit('', data)
                }).catch(err => {
                    return this.client.emit('', data)
                        .pipe(
                            catchError(connectionError => {
                                throw connectionError;
                            })
                        );
                });
        } catch (err) {
            console.log('catch in job', err);
            throw err;
        };
    };

client.connect() 无济于事。

myService.service.ts

const messageObserver = await this.jobQueueService.sendMessage(MQCLI);
                            const createdLog: CreateScheduleExecutionLogDto = {
                                ...data,
                                scheduleID: scheduleID,
                                schedule: item,
                                processDatetime: new Date(),
                            };
                            messageObserver.subscribe({
                                next: x => {
                                    console.log(x);
                                    createdLog.processStatus = OK;
                                    this.scheduleExecutionLogModel.create(createdLog);
                                },
                                error: e => {
                                    console.log(e);
                                    createdLog.processStatus = ERROR;
                                    this.scheduleExecutionLogModel.create(createdLog);
                                },
                            })
回答如下:

this.client.close() 方法允许您删除旧实例。对 this.client 的新请求将创建一个新连接

try{
    const recponse = this.client.emit('').pipe(timeout(5000)).toPromise()
}catch(e){
    if(e.err.code == 'ECONNREFUSED'){
        this.client.close()
    }
}

与本文相关的文章

发布评论

评论列表(0)

  1. 暂无评论