Skip to content

Commit

Permalink
fix: optional param
Browse files Browse the repository at this point in the history
  • Loading branch information
DIY0R committed Jul 8, 2024
1 parent 59f7bba commit 962ab94
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 36 deletions.
4 changes: 2 additions & 2 deletions lib/rmq-core.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ export class RmqNestjsCoreModule {
{ provide: RMQ_APP_OPTIONS, useValue: globalOptions || {} },
{
provide: SERDES,
useValue: globalOptions.globalBroker.serDes ?? serDes,
useValue: globalOptions?.globalBroker?.serDes ?? serDes,
},
RmqNestjsConnectService,
RmqGlobalService,
Expand All @@ -48,7 +48,7 @@ export class RmqNestjsCoreModule {
{ provide: RMQ_APP_OPTIONS, useValue: globalOptions || {} },
{
provide: SERDES,
useValue: globalOptions.globalBroker.serDes ?? serDes,
useValue: globalOptions?.globalBroker?.serDes ?? serDes,
},
RmqNestjsConnectService,
RmqGlobalService,
Expand Down
73 changes: 39 additions & 34 deletions lib/rmq.global.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import { RmqNestjsConnectService } from './rmq-connect.service';
import {
DEFAULT_TIMEOUT,
INDICATE_REPLY_QUEUE,
INDICATE_REPLY_QUEUE_GLOBAL,
INITIALIZATION_STEP_DELAY,
NACKED,
RMQ_APP_OPTIONS,
Expand Down Expand Up @@ -56,41 +57,45 @@ export class RmqGlobalService implements OnModuleInit {
message: IMessage,
options?: IPublishOptions,
): Promise<IReply> {
if (!this.replyToQueue) return this.logger.error(INDICATE_REPLY_QUEUE);
await this.initializationCheck();
const { messageTimeout, serviceName } = this.globalOptions.globalBroker;
return new Promise<IReply>(async (resolve, reject) => {
const correlationId = getUniqId();
const timeout = options?.timeout ?? messageTimeout ?? DEFAULT_TIMEOUT;
const timerId = setTimeout(() => reject(TIMEOUT_ERROR), timeout);
this.sendResponseEmitter.once(correlationId, (msg: Message) => {
clearTimeout(timerId);
const { content } = msg;
if (content.toString()) resolve(this.serDes.deserialize(content));
});
const confirmationFunction = (err: any, ok: Replies.Empty) => {
if (err) {
try {
if (!this.replyToQueue) throw Error(INDICATE_REPLY_QUEUE_GLOBAL);
await this.initializationCheck();
const { messageTimeout, serviceName } = this.globalOptions.globalBroker;
return new Promise<IReply>(async (resolve, reject) => {
const correlationId = getUniqId();
const timeout = options?.timeout ?? messageTimeout ?? DEFAULT_TIMEOUT;
const timerId = setTimeout(() => reject(TIMEOUT_ERROR), timeout);
this.sendResponseEmitter.once(correlationId, (msg: Message) => {
clearTimeout(timerId);
reject(NACKED);
}
};
const { content } = msg;
if (content.toString()) resolve(this.serDes.deserialize(content));
});
const confirmationFunction = (err: any, ok: Replies.Empty) => {
if (err) {
clearTimeout(timerId);
reject(NACKED);
}
};

await this.rmqNestjsConnectService.publish(
{
exchange: exchange,
routingKey: topic,
content: this.serDes.serializer(message),
options: {
replyTo: this.replyToQueue.queue,
appId: serviceName,
correlationId,
timestamp: new Date().getTime(),
...options,
await this.rmqNestjsConnectService.publish(
{
exchange: exchange,
routingKey: topic,
content: this.serDes.serializer(message),
options: {
replyTo: this.replyToQueue.queue,
appId: serviceName,
correlationId,
timestamp: new Date().getTime(),
...options,
},
},
},
confirmationFunction,
);
});
confirmationFunction,
);
});
} catch (error) {
this.logger.error(error);
}
}

public notify<IMessage>(
Expand All @@ -110,14 +115,14 @@ export class RmqGlobalService implements OnModuleInit {
routingKey: topic,
content: this.serDes.serializer(message),
options: {
appId: this.globalOptions.globalBroker.serviceName,
appId: this.globalOptions?.globalBroker?.serviceName ?? '',
timestamp: new Date().getTime(),
...options,
},
},
confirmationFunction,
);
if (this.globalOptions.typeChanel !== TypeChanel.CONFIR_CHANEL)
if (this.globalOptions?.typeChanel !== TypeChanel.CONFIR_CHANEL)
resolve({ status: 'ok' });
});
}
Expand Down

0 comments on commit 962ab94

Please sign in to comment.