From 1879f2e3c7d28b719fca53515ce0430d607f3845 Mon Sep 17 00:00:00 2001 From: diy0r Date: Wed, 19 Jun 2024 23:13:39 +0500 Subject: [PATCH] feat: notification option --- lib/constants.ts | 5 ++-- lib/interfaces/rmq-options.interface.ts | 2 +- lib/rmq-connect.service.ts | 4 +-- lib/rmq-core.module.ts | 5 ++-- lib/rmq.service.ts | 34 ++++++++++++++++++++----- 5 files changed, 36 insertions(+), 14 deletions(-) diff --git a/lib/constants.ts b/lib/constants.ts index 47fd27e..6307206 100644 --- a/lib/constants.ts +++ b/lib/constants.ts @@ -8,8 +8,9 @@ export const TARGET_MODULE = 'TARGET_MODULE'; export const INITIALIZATION_STEP_DELAY = 400; export const DEFAULT_TIMEOUT = 40000; -export const INDICATE_ERROR = 'Please indicate `replyToQueue'; +export const INDICATE_ERROR = 'Please indicate `replyToQueue`'; export const TIMEOUT_ERROR = 'Response timeout error'; export const RECIVED_MESSAGE_ERROR = 'Received a message but with an error'; export const ERROR_RMQ_SERVICE = 'Rmq service error'; -export const INOF_NOT_FULL_OPTIONS = 'Queue will not be created if there is no bind'; +export const INOF_NOT_FULL_OPTIONS = + 'Queue will not be created if there is no bind'; diff --git a/lib/interfaces/rmq-options.interface.ts b/lib/interfaces/rmq-options.interface.ts index 907ef33..56fc0b3 100644 --- a/lib/interfaces/rmq-options.interface.ts +++ b/lib/interfaces/rmq-options.interface.ts @@ -31,7 +31,7 @@ export interface IRMQSRootAsyncOptions extends Pick { export interface IMessageBroker { exchange: IExchange; - replyTo: Options.AssertQueue; + replyTo?: Options.AssertQueue; queue?: IQueue; messageTimeout?: number; targetModuleName: string; diff --git a/lib/rmq-connect.service.ts b/lib/rmq-connect.service.ts index afb7fa4..ae4b4b7 100644 --- a/lib/rmq-connect.service.ts +++ b/lib/rmq-connect.service.ts @@ -102,13 +102,13 @@ export class RmqNestjsConnectService implements OnModuleInit, OnModuleDestroy { throw new Error(`Failed to send Reply Queue`); } } - async listerReplyQueue( + async listenReplyQueue( queue: string, listenQueue: (msg: ConsumeMessage | null) => void ) { try { await this.replyToChannel.consume(queue, listenQueue, { - noAck: false, + noAck: true, }); } catch (error) { throw new Error(`Failed to send listen Reply Queue`); diff --git a/lib/rmq-core.module.ts b/lib/rmq-core.module.ts index 68efc78..b96fcd8 100644 --- a/lib/rmq-core.module.ts +++ b/lib/rmq-core.module.ts @@ -1,6 +1,7 @@ import { DynamicModule, Module, Global } from '@nestjs/common'; import { RMQ_APP_OPTIONS, RMQ_CONNECT_OPTIONS } from './constants'; import { IRMQSRootAsyncOptions, IRabbitMQConfig } from './interfaces'; + import { RmqNestjsConnectService } from './rmq-connect.service'; import { IAppOptions } from './interfaces/app-options.interface'; @@ -9,7 +10,7 @@ import { IAppOptions } from './interfaces/app-options.interface'; export class RmqNestjsCoreModule { static forRoot( options: IRabbitMQConfig, - appOptions?: IAppOptions, + appOptions?: IAppOptions ): DynamicModule { return { module: RmqNestjsCoreModule, @@ -23,7 +24,7 @@ export class RmqNestjsCoreModule { } static forRootAsync( options: IRMQSRootAsyncOptions, - appOptions?: IAppOptions, + appOptions?: IAppOptions ): DynamicModule { return { module: RmqNestjsCoreModule, diff --git a/lib/rmq.service.ts b/lib/rmq.service.ts index 73b18d4..c0dc204 100644 --- a/lib/rmq.service.ts +++ b/lib/rmq.service.ts @@ -59,14 +59,32 @@ export class RmqService implements OnModuleInit, OnModuleDestroy { this.isInitialized = true; } + public async notify( + topic: string, + message: IMessage, + options?: IPublishOptions + ) { + await this.initializationCheck(); + this.rmqNestjsConnectService.publish({ + exchange: this.options.exchange.exchange, + routingKey: topic, + content: message, + options: { + appId: this.options.serviceName, + timestamp: new Date().getTime(), + ...options, + }, + }); + return { sent: 'ok' }; + } public async send( topic: string, message: IMessage, options?: IPublishOptions ): Promise { + await this.initializationCheck(); + if (!this.replyToQueue) return this.logger.error(INDICATE_ERROR); return new Promise(async (resolve, reject) => { - await this.initializationCheck(); - if (!this.replyToQueue) this.logger.error(INDICATE_ERROR); const correlationId = getUniqId(); const timeout = options?.timeout ?? this.options.messageTimeout ?? DEFAULT_TIMEOUT; @@ -96,17 +114,19 @@ export class RmqService implements OnModuleInit, OnModuleDestroy { private async listenQueue(message: ConsumeMessage | null): Promise { if (!message) return; const consumeFunction = this.rmqMessageTegs.get(message.fields.routingKey); - const result = consumeFunction(JSON.parse(message.content.toString())); + const result = await consumeFunction( + JSON.parse(message.content.toString()) + ); if (message.properties.replyTo) { await this.rmqNestjsConnectService.sendToReplyQueue({ replyTo: message.properties.replyTo, content: result || { status: 'recived' }, correlationId: message.properties.correlationId, }); - this.rmqNestjsConnectService.ack(message); } + this.rmqNestjsConnectService.ack(message); } - private async listerReplyQueue( + private async listenReplyQueue( message: ConsumeMessage | null ): Promise { if (message.properties.correlationId) { @@ -149,9 +169,9 @@ export class RmqService implements OnModuleInit, OnModuleDestroy { TypeQueue.REPLY_QUEUE, { queue: '', options: this.options.replyTo } ); - await this.rmqNestjsConnectService.listerReplyQueue( + await this.rmqNestjsConnectService.listenReplyQueue( this.replyToQueue.queue, - this.listerReplyQueue.bind(this) + this.listenReplyQueue.bind(this) ); } private async initializationCheck() {