Skip to content

Commit

Permalink
feat: notification option
Browse files Browse the repository at this point in the history
  • Loading branch information
DIY0R committed Jun 19, 2024
1 parent 41a7a3d commit 1879f2e
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 14 deletions.
5 changes: 3 additions & 2 deletions lib/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ export const TARGET_MODULE = 'TARGET_MODULE';
export const INITIALIZATION_STEP_DELAY = 400;
export const DEFAULT_TIMEOUT = 40000;

export const INDICATE_ERROR = 'Please indicate `replyToQueue';
export const INDICATE_ERROR = 'Please indicate `replyToQueue`';
export const TIMEOUT_ERROR = 'Response timeout error';
export const RECIVED_MESSAGE_ERROR = 'Received a message but with an error';
export const ERROR_RMQ_SERVICE = 'Rmq service error';
export const INOF_NOT_FULL_OPTIONS = 'Queue will not be created if there is no bind';
export const INOF_NOT_FULL_OPTIONS =
'Queue will not be created if there is no bind';
2 changes: 1 addition & 1 deletion lib/interfaces/rmq-options.interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ export interface IRMQSRootAsyncOptions extends Pick<ModuleMetadata, 'imports'> {

export interface IMessageBroker {
exchange: IExchange;
replyTo: Options.AssertQueue;
replyTo?: Options.AssertQueue;
queue?: IQueue;
messageTimeout?: number;
targetModuleName: string;
Expand Down
4 changes: 2 additions & 2 deletions lib/rmq-connect.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,13 @@ export class RmqNestjsConnectService implements OnModuleInit, OnModuleDestroy {
throw new Error(`Failed to send Reply Queue`);
}
}
async listerReplyQueue(
async listenReplyQueue(
queue: string,
listenQueue: (msg: ConsumeMessage | null) => void
) {
try {
await this.replyToChannel.consume(queue, listenQueue, {
noAck: false,
noAck: true,
});
} catch (error) {
throw new Error(`Failed to send listen Reply Queue`);
Expand Down
5 changes: 3 additions & 2 deletions lib/rmq-core.module.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { DynamicModule, Module, Global } from '@nestjs/common';
import { RMQ_APP_OPTIONS, RMQ_CONNECT_OPTIONS } from './constants';
import { IRMQSRootAsyncOptions, IRabbitMQConfig } from './interfaces';

import { RmqNestjsConnectService } from './rmq-connect.service';
import { IAppOptions } from './interfaces/app-options.interface';

Expand All @@ -9,7 +10,7 @@ import { IAppOptions } from './interfaces/app-options.interface';
export class RmqNestjsCoreModule {
static forRoot(
options: IRabbitMQConfig,
appOptions?: IAppOptions,
appOptions?: IAppOptions
): DynamicModule {
return {
module: RmqNestjsCoreModule,
Expand All @@ -23,7 +24,7 @@ export class RmqNestjsCoreModule {
}
static forRootAsync(
options: IRMQSRootAsyncOptions,
appOptions?: IAppOptions,
appOptions?: IAppOptions
): DynamicModule {
return {
module: RmqNestjsCoreModule,
Expand Down
34 changes: 27 additions & 7 deletions lib/rmq.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,32 @@ export class RmqService implements OnModuleInit, OnModuleDestroy {
this.isInitialized = true;
}

public async notify<IMessage>(
topic: string,
message: IMessage,
options?: IPublishOptions
) {
await this.initializationCheck();
this.rmqNestjsConnectService.publish({
exchange: this.options.exchange.exchange,
routingKey: topic,
content: message,
options: {
appId: this.options.serviceName,
timestamp: new Date().getTime(),
...options,
},
});
return { sent: 'ok' };
}
public async send<IMessage, IReply>(
topic: string,
message: IMessage,
options?: IPublishOptions
): Promise<IReply> {
await this.initializationCheck();
if (!this.replyToQueue) return this.logger.error(INDICATE_ERROR);
return new Promise<IReply>(async (resolve, reject) => {
await this.initializationCheck();
if (!this.replyToQueue) this.logger.error(INDICATE_ERROR);
const correlationId = getUniqId();
const timeout =
options?.timeout ?? this.options.messageTimeout ?? DEFAULT_TIMEOUT;
Expand Down Expand Up @@ -96,17 +114,19 @@ export class RmqService implements OnModuleInit, OnModuleDestroy {
private async listenQueue(message: ConsumeMessage | null): Promise<void> {
if (!message) return;
const consumeFunction = this.rmqMessageTegs.get(message.fields.routingKey);
const result = consumeFunction(JSON.parse(message.content.toString()));
const result = await consumeFunction(
JSON.parse(message.content.toString())
);
if (message.properties.replyTo) {
await this.rmqNestjsConnectService.sendToReplyQueue({
replyTo: message.properties.replyTo,
content: result || { status: 'recived' },
correlationId: message.properties.correlationId,
});
this.rmqNestjsConnectService.ack(message);
}
this.rmqNestjsConnectService.ack(message);
}
private async listerReplyQueue(
private async listenReplyQueue(
message: ConsumeMessage | null
): Promise<void> {
if (message.properties.correlationId) {
Expand Down Expand Up @@ -149,9 +169,9 @@ export class RmqService implements OnModuleInit, OnModuleDestroy {
TypeQueue.REPLY_QUEUE,
{ queue: '', options: this.options.replyTo }
);
await this.rmqNestjsConnectService.listerReplyQueue(
await this.rmqNestjsConnectService.listenReplyQueue(
this.replyToQueue.queue,
this.listerReplyQueue.bind(this)
this.listenReplyQueue.bind(this)
);
}
private async initializationCheck() {
Expand Down

0 comments on commit 1879f2e

Please sign in to comment.