From 44e467477469dc1dd6f7241c833ee6e5d7eccf33 Mon Sep 17 00:00:00 2001 From: diy0r Date: Mon, 17 Jun 2024 20:55:18 +0500 Subject: [PATCH] feat: base RPC, logger --- lib/common/get-uniqId.ts | 4 + lib/common/index.ts | 3 + lib/common/logger.ts | 29 ++++ lib/common/meta-teg.discovery.ts | 7 +- lib/common/rmq-intercepter.ts | 14 ++ lib/common/rmq-pipe.ts | 14 ++ lib/constants.ts | 11 ++ lib/decorators/rmq-message.decorator.ts | 2 +- lib/decorators/transform.decorator.ts | 2 +- lib/index.ts | 4 +- lib/interfaces/app-options.interface.ts | 11 ++ lib/interfaces/metategs.ts | 2 +- lib/interfaces/rmq-options.interface.ts | 23 ++- lib/interfaces/rmqService.ts | 6 +- ...nect.service.ts => rmq-connect.service.ts} | 68 +++++++- ...stjs-core.module.ts => rmq-core.module.ts} | 21 ++- lib/rmq-nestjs.module.ts | 40 ----- lib/rmq-nestjs.service.ts | 64 -------- lib/rmq.module.ts | 47 ++++++ lib/rmq.service.ts | 155 ++++++++++++++++++ 20 files changed, 402 insertions(+), 125 deletions(-) create mode 100644 lib/common/get-uniqId.ts create mode 100644 lib/common/logger.ts create mode 100644 lib/common/rmq-intercepter.ts create mode 100644 lib/common/rmq-pipe.ts create mode 100644 lib/interfaces/app-options.interface.ts rename lib/{rmq-nestjs-connect.service.ts => rmq-connect.service.ts} (61%) rename lib/{rmq-nestjs-core.module.ts => rmq-core.module.ts} (54%) delete mode 100644 lib/rmq-nestjs.module.ts delete mode 100644 lib/rmq-nestjs.service.ts create mode 100644 lib/rmq.module.ts create mode 100644 lib/rmq.service.ts diff --git a/lib/common/get-uniqId.ts b/lib/common/get-uniqId.ts new file mode 100644 index 0000000..1bcac17 --- /dev/null +++ b/lib/common/get-uniqId.ts @@ -0,0 +1,4 @@ +import { randomUUID } from 'node:crypto'; +export const getUniqId = (): string => { + return randomUUID(); +}; diff --git a/lib/common/index.ts b/lib/common/index.ts index 51b4dff..73698cb 100644 --- a/lib/common/index.ts +++ b/lib/common/index.ts @@ -1 +1,4 @@ export * from './meta-teg.discovery'; +export * from './get-uniqId'; +export * from './rmq-intercepter'; +export * from './rmq-pipe'; diff --git a/lib/common/logger.ts b/lib/common/logger.ts new file mode 100644 index 0000000..d6cfaf8 --- /dev/null +++ b/lib/common/logger.ts @@ -0,0 +1,29 @@ +import { Logger, LoggerService } from '@nestjs/common'; +import { blueBright, white, yellow } from 'chalk'; + +export class RQMColorLogger implements LoggerService { + logMessages: boolean; + + constructor(logMessages: boolean) { + this.logMessages = logMessages ?? false; + } + log(message: any, context?: string): any { + Logger.log(message, context); + } + error(message: any, trace?: string, context?: string): any { + Logger.error(message, trace, context); + } + debug(message: any, context?: string): any { + if (!this.logMessages) { + return; + } + const msg = JSON.stringify(message); + const action = context.split(',')[0]; + const topic = context.split(',')[1]; + Logger.log(`${blueBright(action)} [${yellow(topic)}] ${white(msg)}`); + console.warn(`${blueBright(action)} [${yellow(topic)}] ${white(msg)}`); + } + warn(message: any, context?: string): any { + Logger.warn(message, context); + } +} diff --git a/lib/common/meta-teg.discovery.ts b/lib/common/meta-teg.discovery.ts index d96366a..cf0429a 100644 --- a/lib/common/meta-teg.discovery.ts +++ b/lib/common/meta-teg.discovery.ts @@ -2,7 +2,8 @@ import { Inject, Injectable } from '@nestjs/common'; import { ModulesContainer, Reflector } from '@nestjs/core'; import { MetadataScanner } from '@nestjs/core'; import { InstanceWrapper } from '@nestjs/core/injector/instance-wrapper'; -import { MetaTegsMap } from '../interfaces'; +import { TARGET_MODULE } from '../constants'; +import { IMetaTegsMap } from '../interfaces'; @Injectable() export class MetaTegsScannerService { @@ -10,7 +11,7 @@ export class MetaTegsScannerService { private readonly metadataScanner: MetadataScanner, private readonly reflector: Reflector, private readonly modulesContainer: ModulesContainer, - @Inject('TARGET_MODULE') private readonly targetModuleName: string, + @Inject(TARGET_MODULE) private readonly targetModuleName: string, ) {} public scan(metaTeg: string) { @@ -44,7 +45,7 @@ export class MetaTegsScannerService { private lookupMethods( metaTeg: string, - rmqMessagesMap: MetaTegsMap, + rmqMessagesMap: IMetaTegsMap, instance: object, prototype: object, methodName: string, diff --git a/lib/common/rmq-intercepter.ts b/lib/common/rmq-intercepter.ts new file mode 100644 index 0000000..aa29448 --- /dev/null +++ b/lib/common/rmq-intercepter.ts @@ -0,0 +1,14 @@ +import { Message } from 'amqplib'; +import { LoggerService } from '@nestjs/common'; + +export class RMQIntercepterClass { + protected logger: LoggerService; + + constructor(logger: LoggerService = console) { + this.logger = logger; + } + + async intercept(res: any, msg: Message, error?: Error): Promise { + return res; + } +} diff --git a/lib/common/rmq-pipe.ts b/lib/common/rmq-pipe.ts new file mode 100644 index 0000000..62a0aa6 --- /dev/null +++ b/lib/common/rmq-pipe.ts @@ -0,0 +1,14 @@ +import { Message } from 'amqplib'; +import { LoggerService } from '@nestjs/common'; + +export class RMQPipeClass { + protected logger: LoggerService; + + constructor(logger: LoggerService = console) { + this.logger = logger; + } + + async transform(msg: Message): Promise { + return msg; + } +} diff --git a/lib/constants.ts b/lib/constants.ts index 31d9af2..47fd27e 100644 --- a/lib/constants.ts +++ b/lib/constants.ts @@ -2,3 +2,14 @@ export const RMQ_CONNECT_OPTIONS = 'RMQ_CONNECT_OPTIONS'; export const RMQ_BROKER_OPTIONS = 'RMQ_BROKER_OPTIONS'; export const RMQ_MESSAGE_META_TEG = 'RMQ_MESSAGE_META_TEG'; export const RMQ_ROUTES_TRANSFORM = 'RMQ_ROUTES_TRANSFORM'; +export const RMQ_APP_OPTIONS = 'RMQ_APP_OPTIONS'; +export const TARGET_MODULE = 'TARGET_MODULE'; + +export const INITIALIZATION_STEP_DELAY = 400; +export const DEFAULT_TIMEOUT = 40000; + +export const INDICATE_ERROR = 'Please indicate `replyToQueue'; +export const TIMEOUT_ERROR = 'Response timeout error'; +export const RECIVED_MESSAGE_ERROR = 'Received a message but with an error'; +export const ERROR_RMQ_SERVICE = 'Rmq service error'; +export const INOF_NOT_FULL_OPTIONS = 'Queue will not be created if there is no bind'; diff --git a/lib/decorators/rmq-message.decorator.ts b/lib/decorators/rmq-message.decorator.ts index 8cd0e97..cab59b7 100644 --- a/lib/decorators/rmq-message.decorator.ts +++ b/lib/decorators/rmq-message.decorator.ts @@ -1,4 +1,4 @@ -import { RMQ_MESSAGE_META_TEG } from 'lib/constants'; +import { RMQ_MESSAGE_META_TEG } from '../constants'; export function RMQMessage(event: string) { return function (target: any, propertyKey: string | symbol, descriptor: any) { diff --git a/lib/decorators/transform.decorator.ts b/lib/decorators/transform.decorator.ts index d045117..1ce4abb 100644 --- a/lib/decorators/transform.decorator.ts +++ b/lib/decorators/transform.decorator.ts @@ -1,5 +1,5 @@ import { applyDecorators, SetMetadata } from '@nestjs/common'; -import { RMQ_ROUTES_TRANSFORM } from 'lib/constants'; +import { RMQ_ROUTES_TRANSFORM } from '../constants'; export const RMQTransform = (): MethodDecorator => { return applyDecorators(SetMetadata(RMQ_ROUTES_TRANSFORM, true)); diff --git a/lib/index.ts b/lib/index.ts index 12b5a7e..9c41bbb 100644 --- a/lib/index.ts +++ b/lib/index.ts @@ -1,2 +1,2 @@ -export * from './rmq-nestjs.module'; -export * from './rmq-nestjs.service'; +export * from './rmq.module'; +export * from './rmq.service'; diff --git a/lib/interfaces/app-options.interface.ts b/lib/interfaces/app-options.interface.ts new file mode 100644 index 0000000..4e43cc6 --- /dev/null +++ b/lib/interfaces/app-options.interface.ts @@ -0,0 +1,11 @@ +import { LoggerService } from '@nestjs/common'; +import { RMQIntercepterClass, RMQPipeClass } from '../common'; + +export interface IAppOptions { + logger?: LoggerService; + globalMiddleware?: (typeof RMQPipeClass)[]; + globalIntercepters?: (typeof RMQIntercepterClass)[]; + errorHandler?: object; + serviceName?: string; + logMessages: boolean; +} diff --git a/lib/interfaces/metategs.ts b/lib/interfaces/metategs.ts index 4f9beae..1e8c711 100644 --- a/lib/interfaces/metategs.ts +++ b/lib/interfaces/metategs.ts @@ -1 +1 @@ -export type MetaTegsMap = Map any>; +export type IMetaTegsMap = Map any>; diff --git a/lib/interfaces/rmq-options.interface.ts b/lib/interfaces/rmq-options.interface.ts index 7434eb5..907ef33 100644 --- a/lib/interfaces/rmq-options.interface.ts +++ b/lib/interfaces/rmq-options.interface.ts @@ -31,13 +31,32 @@ export interface IRMQSRootAsyncOptions extends Pick { export interface IMessageBroker { exchange: IExchange; - queue?: IQueue; replyTo: Options.AssertQueue; + queue?: IQueue; + messageTimeout?: number; targetModuleName: string; + serviceName?: string; } -export interface BindQueue { +export interface IBindQueue { queue: string; source: string; pattern: string; args?: Record; } + +export interface ISendMessage { + exchange: string; + routingKey: string; + content: Record; + options: Options.Publish; +} +export interface IPublishOptions extends Options.Publish { + timeout?: number; +} + +export interface ISendToReplyQueueOptions { + replyTo: string; + content: Record; + correlationId: string; + options?: Options.Publish; +} diff --git a/lib/interfaces/rmqService.ts b/lib/interfaces/rmqService.ts index f0ff56a..f5a14bd 100644 --- a/lib/interfaces/rmqService.ts +++ b/lib/interfaces/rmqService.ts @@ -1 +1,5 @@ -export interface ImqService {} +import { ConsumeMessage } from 'amqplib'; + +export interface ImqService { + readonly listenQueue: (msg: ConsumeMessage | null) => void; +} diff --git a/lib/rmq-nestjs-connect.service.ts b/lib/rmq-connect.service.ts similarity index 61% rename from lib/rmq-nestjs-connect.service.ts rename to lib/rmq-connect.service.ts index 087af4f..b560f0a 100644 --- a/lib/rmq-nestjs-connect.service.ts +++ b/lib/rmq-connect.service.ts @@ -10,17 +10,19 @@ import { IExchange, IQueue, TypeQueue, - BindQueue, + IBindQueue, + ISendMessage, + ISendToReplyQueueOptions, } from './interfaces'; -import { Channel, Connection, Replies, connect } from 'amqplib'; +import { Channel, Connection, ConsumeMessage, Replies, connect } from 'amqplib'; @Injectable() export class RmqNestjsConnectService implements OnModuleInit, OnModuleDestroy { private connection: Connection = null; private baseChannel: Channel = null; private replyToChannel: Channel = null; - private declared = false; + constructor( @Inject(RMQ_CONNECT_OPTIONS) private readonly options: IRabbitMQConfig ) {} @@ -47,6 +49,11 @@ export class RmqNestjsConnectService implements OnModuleInit, OnModuleDestroy { ); } } + public ack( + ...params: Parameters + ): ReturnType { + return this.baseChannel.ack(...params); + } public async assertQueue( typeQueue: TypeQueue, options: IQueue @@ -68,7 +75,7 @@ export class RmqNestjsConnectService implements OnModuleInit, OnModuleDestroy { throw new Error(`Failed to assert ${typeQueue} queue: ${error.message}`); } } - async bindQueue(bindQueue: BindQueue): Promise { + async bindQueue(bindQueue: IBindQueue): Promise { try { await this.baseChannel.bindQueue( bindQueue.queue, @@ -82,6 +89,59 @@ export class RmqNestjsConnectService implements OnModuleInit, OnModuleDestroy { ); } } + async sendToReplyQueue(sendToQueueOptions: ISendToReplyQueueOptions) { + try { + this.replyToChannel.sendToQueue( + sendToQueueOptions.replyTo, + Buffer.from(JSON.stringify(sendToQueueOptions.content)), + { + correlationId: sendToQueueOptions.correlationId, + } + ); + } catch (error) { + throw new Error(`Failed to send Reply Queue`); + } + } + async listerReplyQueue( + queue: string, + listenQueue: (msg: ConsumeMessage | null) => void + ) { + try { + await this.replyToChannel.consume(queue, listenQueue, { + noAck: false, + }); + } catch (error) { + throw new Error(`Failed to send listen Reply Queue`); + } + } + async listenQueue( + queue: string, + listenQueue: (msg: ConsumeMessage | null) => void + ): Promise { + try { + await this.baseChannel.consume(queue, listenQueue, { + noAck: false, + }); + } catch (error) { + throw new Error(`Failed to listen Queue`); + } + } + + publish(sendMessage: ISendMessage): void { + try { + this.baseChannel.publish( + sendMessage.exchange, + sendMessage.routingKey, + Buffer.from(JSON.stringify(sendMessage.content)), + { + replyTo: sendMessage.options.replyTo, + correlationId: sendMessage.options.correlationId, + } + ); + } catch (error) { + throw new Error(`Failed to send message ${error}`); + } + } private async setUpConnect(options: IRabbitMQConfig) { const { username, password, hostname, port, virtualHost } = options; const url = `amqp://${username}:${password}@${hostname}:${port}/${virtualHost}`; diff --git a/lib/rmq-nestjs-core.module.ts b/lib/rmq-core.module.ts similarity index 54% rename from lib/rmq-nestjs-core.module.ts rename to lib/rmq-core.module.ts index d5930be..d770b95 100644 --- a/lib/rmq-nestjs-core.module.ts +++ b/lib/rmq-core.module.ts @@ -1,22 +1,30 @@ import { DynamicModule, Module, Global } from '@nestjs/common'; -import { RmqNestjsConnectService } from './rmq-nestjs-connect.service'; -import { RMQ_CONNECT_OPTIONS } from './constants'; +import { RMQ_APP_OPTIONS, RMQ_CONNECT_OPTIONS } from './constants'; import { IRMQSRootAsyncOptions, IRabbitMQConfig } from './interfaces'; +import { RmqNestjsConnectService } from './rmq-connect.service'; +import { IAppOptions } from './interfaces/app-options.interface'; @Global() @Module({}) export class RmqNestjsCoreModule { - static forRoot(options: IRabbitMQConfig): DynamicModule { + static forRoot( + options: IRabbitMQConfig, + appOptions?: IAppOptions + ): DynamicModule { return { module: RmqNestjsCoreModule, providers: [ { provide: RMQ_CONNECT_OPTIONS, useValue: options }, + { provide: RMQ_APP_OPTIONS, useValue: appOptions || {} }, RmqNestjsConnectService, ], - exports: [RmqNestjsConnectService], + exports: [RmqNestjsConnectService, RMQ_APP_OPTIONS], }; } - static forRootAsync(options: IRMQSRootAsyncOptions): DynamicModule { + static forRootAsync( + options: IRMQSRootAsyncOptions, + appOptions?: IAppOptions + ): DynamicModule { return { module: RmqNestjsCoreModule, imports: options.imports, @@ -29,9 +37,10 @@ export class RmqNestjsCoreModule { }, inject: options.inject || [], }, + { provide: RMQ_APP_OPTIONS, useValue: appOptions || {} }, RmqNestjsConnectService, ], - exports: [RmqNestjsConnectService], + exports: [RmqNestjsConnectService, RMQ_APP_OPTIONS], }; } } diff --git a/lib/rmq-nestjs.module.ts b/lib/rmq-nestjs.module.ts deleted file mode 100644 index 614640a..0000000 --- a/lib/rmq-nestjs.module.ts +++ /dev/null @@ -1,40 +0,0 @@ -import { DynamicModule, Module, Provider } from '@nestjs/common'; -import { RmqService } from './rmq-nestjs.service'; -import { - IMessageBroker, - IRMQSRootAsyncOptions, - IRabbitMQConfig, -} from './interfaces'; -import { RMQ_BROKER_OPTIONS } from './constants'; -import { RmqNestjsCoreModule } from './rmq-nestjs-core.module'; -import { DiscoveryModule } from '@nestjs/core'; -import { MetaTegsScannerService } from './common'; - -@Module({}) -export class RmqNestjsModule { - static forRoot(options: IRabbitMQConfig): DynamicModule { - return { - module: RmqNestjsModule, - imports: [RmqNestjsCoreModule.forRoot(options)], - }; - } - static forRootAsync(options: IRMQSRootAsyncOptions): DynamicModule { - return { - module: RmqNestjsModule, - imports: [RmqNestjsCoreModule.forRootAsync(options)], - }; - } - static forFeature(options: IMessageBroker): DynamicModule { - return { - module: RmqNestjsModule, - imports: [DiscoveryModule], - providers: [ - { provide: RMQ_BROKER_OPTIONS, useValue: options }, - { provide: 'TARGET_MODULE', useValue: options.targetModuleName }, - RmqService, - MetaTegsScannerService, - ], - exports: [RmqService, MetaTegsScannerService], - }; - } -} diff --git a/lib/rmq-nestjs.service.ts b/lib/rmq-nestjs.service.ts deleted file mode 100644 index 8deb822..0000000 --- a/lib/rmq-nestjs.service.ts +++ /dev/null @@ -1,64 +0,0 @@ -import { - Inject, - Injectable, - OnModuleDestroy, - OnModuleInit, -} from '@nestjs/common'; -import { RmqNestjsConnectService } from './rmq-nestjs-connect.service'; - -import { IMessageBroker, TypeQueue } from './interfaces'; -import { MetaTegsMap } from './interfaces/metategs'; -import { RMQ_BROKER_OPTIONS, RMQ_MESSAGE_META_TEG } from './constants'; -import { Replies } from 'amqplib'; -import { MetaTegsScannerService } from './common'; - -@Injectable() -export class RmqService implements OnModuleInit, OnModuleDestroy { - private rmqMessageTegs: MetaTegsMap; - private replyToQueue: Replies.AssertQueue = null; - private exchange: Replies.AssertExchange = null; - constructor( - private readonly rmqNestjsConnectService: RmqNestjsConnectService, - - private readonly metaTegsScannerService: MetaTegsScannerService, - @Inject(RMQ_BROKER_OPTIONS) private options: IMessageBroker - ) {} - - async onModuleInit() { - this.rmqMessageTegs = - this.metaTegsScannerService.scan(RMQ_MESSAGE_META_TEG); - await this.bindQueueExchange(); - } - private async bindQueueExchange() { - try { - this.exchange = await this.rmqNestjsConnectService.assertExchange( - this.options.exchange - ); - if (this.options.replyTo) await this.assertReplyQueue(); - if (!this.options.queue || !this.rmqMessageTegs?.size) return; - const queue = await this.rmqNestjsConnectService.assertQueue( - TypeQueue.REPLY_QUEUE, - this.options.queue - ); - this.rmqMessageTegs.forEach(async (_, key) => { - await this.rmqNestjsConnectService.bindQueue({ - queue: queue.queue, - source: this.exchange.exchange, - pattern: key.toString(), - // args:any - comming soon - }); - }); - } catch (error) { - throw new Error(`Failed to bind queue to exchange,${error.message}`); - } - } - private async assertReplyQueue() { - this.replyToQueue = await this.rmqNestjsConnectService.assertQueue( - TypeQueue.REPLY_QUEUE, - { queue: '', options: this.options.replyTo } - ); - } - async onModuleDestroy() { - throw new Error('Method not implemented.'); - } -} diff --git a/lib/rmq.module.ts b/lib/rmq.module.ts new file mode 100644 index 0000000..4c0cc42 --- /dev/null +++ b/lib/rmq.module.ts @@ -0,0 +1,47 @@ +import { DynamicModule, Module } from '@nestjs/common'; +import { RmqService } from './rmq.service'; +import { + IMessageBroker, + IRMQSRootAsyncOptions, + IRabbitMQConfig, +} from './interfaces'; +import { RMQ_BROKER_OPTIONS, TARGET_MODULE } from './constants'; +import { DiscoveryModule } from '@nestjs/core'; +import { MetaTegsScannerService } from './common'; +import { RmqNestjsCoreModule } from './rmq-core.module'; +import { IAppOptions } from './interfaces/app-options.interface'; + +@Module({}) +export class RmqNestjsModule { + static forRoot( + options: IRabbitMQConfig, + appOptions?: IAppOptions + ): DynamicModule { + return { + module: RmqNestjsModule, + imports: [RmqNestjsCoreModule.forRoot(options, appOptions)], + }; + } + static forRootAsync( + options: IRMQSRootAsyncOptions, + appOptions?: IAppOptions + ): DynamicModule { + return { + module: RmqNestjsModule, + imports: [RmqNestjsCoreModule.forRootAsync(options, appOptions)], + }; + } + static forFeature(options: IMessageBroker): DynamicModule { + return { + module: RmqNestjsModule, + imports: [DiscoveryModule], + providers: [ + { provide: RMQ_BROKER_OPTIONS, useValue: options }, + { provide: TARGET_MODULE, useValue: options.targetModuleName }, + RmqService, + MetaTegsScannerService, + ], + exports: [RmqService, MetaTegsScannerService], + }; + } +} diff --git a/lib/rmq.service.ts b/lib/rmq.service.ts new file mode 100644 index 0000000..0250fd1 --- /dev/null +++ b/lib/rmq.service.ts @@ -0,0 +1,155 @@ +import { + Inject, + Injectable, + LoggerService, + OnModuleDestroy, + OnModuleInit, +} from '@nestjs/common'; + +import { IMessageBroker, IPublishOptions, TypeQueue } from './interfaces'; +import { IMetaTegsMap } from './interfaces/metategs'; +import { + DEFAULT_TIMEOUT, + INDICATE_ERROR, + INITIALIZATION_STEP_DELAY, + INOF_NOT_FULL_OPTIONS, + RECIVED_MESSAGE_ERROR, + RMQ_APP_OPTIONS, + RMQ_BROKER_OPTIONS, + RMQ_MESSAGE_META_TEG, + TIMEOUT_ERROR, +} from './constants'; +import { ConsumeMessage, Message, Replies } from 'amqplib'; +import { MetaTegsScannerService } from './common'; +import { RmqNestjsConnectService } from './rmq-connect.service'; +import { getUniqId } from './common/get-uniqId'; +import { EventEmitter } from 'stream'; +import { IAppOptions } from './interfaces/app-options.interface'; +import { RQMColorLogger } from './common/logger'; + +@Injectable() +export class RmqService implements OnModuleInit, OnModuleDestroy { + private sendResponseEmitter: EventEmitter = new EventEmitter(); + + private rmqMessageTegs: IMetaTegsMap = null; + private replyToQueue: Replies.AssertQueue = null; + private exchange: Replies.AssertExchange = null; + private isInitialized: boolean = false; + private logger: LoggerService; + constructor( + private readonly rmqNestjsConnectService: RmqNestjsConnectService, + private readonly metaTegsScannerService: MetaTegsScannerService, + @Inject(RMQ_BROKER_OPTIONS) private options: IMessageBroker, + @Inject(RMQ_APP_OPTIONS) private appOptions: IAppOptions, + ) { + this.logger = appOptions.logger + ? appOptions.logger + : new RQMColorLogger(this.appOptions.logMessages); + } + + async onModuleInit() { + this.rmqMessageTegs = + this.metaTegsScannerService.scan(RMQ_MESSAGE_META_TEG); + await this.bindQueueExchange(); + this.isInitialized = true; + } + + public async send( + topic: string, + message: IMessage, + options?: IPublishOptions, + ): Promise { + return new Promise(async (resolve, reject) => { + await this.initializationCheck(); + if (!this.replyToQueue) this.logger.error(INDICATE_ERROR); + const correlationId = getUniqId(); + const timeout = + options?.timeout ?? this.options.messageTimeout ?? DEFAULT_TIMEOUT; + const timerId = setTimeout(() => reject(TIMEOUT_ERROR), timeout); + this.sendResponseEmitter.once(correlationId, (msg: Message) => { + clearTimeout(timerId); + if (msg.properties?.headers?.['-x-error']) + reject(RECIVED_MESSAGE_ERROR); + const { content } = msg; + if (content.toString()) resolve(JSON.parse(content.toString())); + }); + + this.rmqNestjsConnectService.publish({ + exchange: this.options.exchange.exchange, + routingKey: topic, + content: message, + options: { + replyTo: this.replyToQueue.queue, + appId: this.options.serviceName, + correlationId, + timestamp: new Date().getTime(), + ...options, + }, + }); + }); + } + private async listenQueue(message: ConsumeMessage | null): Promise { + if (!message) return; + const consumeFunction = this.rmqMessageTegs.get(message.fields.routingKey); + const result = consumeFunction(JSON.parse(message.content.toString())); + if (message.properties.replyTo) { + await this.rmqNestjsConnectService.sendToReplyQueue({ + replyTo: message.properties.replyTo, + content: result || { status: 'recived' }, + correlationId: message.properties.correlationId, + }); + this.rmqNestjsConnectService.ack(message); + } + } + private async listerReplyQueue( + message: ConsumeMessage | null, + ): Promise { + if (message.properties.correlationId) { + this.sendResponseEmitter.emit(message.properties.correlationId, message); + } + } + private async bindQueueExchange() { + this.exchange = await this.rmqNestjsConnectService.assertExchange( + this.options.exchange, + ); + if (this.options.replyTo) await this.assertReplyQueue(); + if (!this.options.queue || !this.rmqMessageTegs?.size) + return this.logger.warn(INOF_NOT_FULL_OPTIONS); + const queue = await this.rmqNestjsConnectService.assertQueue( + TypeQueue.REPLY_QUEUE, + this.options.queue, + ); + this.rmqMessageTegs.forEach(async (_, key) => { + await this.rmqNestjsConnectService.bindQueue({ + queue: queue.queue, + source: this.exchange.exchange, + pattern: key.toString(), + // args:any - comming soon + }); + }); + await this.rmqNestjsConnectService.listenQueue( + this.options.queue.queue, + this.listenQueue.bind(this), + ); + await this.rmqNestjsConnectService.listerReplyQueue( + this.replyToQueue.queue, + this.listerReplyQueue.bind(this), + ); + } + private async initializationCheck() { + if (this.isInitialized) return; + await new Promise((resolve) => + setTimeout(resolve, INITIALIZATION_STEP_DELAY), + ); + await this.initializationCheck(); + } + private async assertReplyQueue() { + this.replyToQueue = await this.rmqNestjsConnectService.assertQueue( + TypeQueue.REPLY_QUEUE, + { queue: '', options: this.options.replyTo }, + ); + } + async onModuleDestroy() { + this.sendResponseEmitter.removeAllListeners(); + } +}