Skip to content

Commit

Permalink
feat: base RPC, logger
Browse files Browse the repository at this point in the history
  • Loading branch information
DIY0R committed Jun 17, 2024
1 parent df51813 commit 44e4674
Show file tree
Hide file tree
Showing 20 changed files with 402 additions and 125 deletions.
4 changes: 4 additions & 0 deletions lib/common/get-uniqId.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import { randomUUID } from 'node:crypto';
export const getUniqId = (): string => {
return randomUUID();
};
3 changes: 3 additions & 0 deletions lib/common/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1,4 @@
export * from './meta-teg.discovery';
export * from './get-uniqId';
export * from './rmq-intercepter';
export * from './rmq-pipe';
29 changes: 29 additions & 0 deletions lib/common/logger.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import { Logger, LoggerService } from '@nestjs/common';
import { blueBright, white, yellow } from 'chalk';

export class RQMColorLogger implements LoggerService {
logMessages: boolean;

constructor(logMessages: boolean) {
this.logMessages = logMessages ?? false;
}
log(message: any, context?: string): any {
Logger.log(message, context);
}
error(message: any, trace?: string, context?: string): any {
Logger.error(message, trace, context);
}
debug(message: any, context?: string): any {
if (!this.logMessages) {
return;
}
const msg = JSON.stringify(message);
const action = context.split(',')[0];
const topic = context.split(',')[1];
Logger.log(`${blueBright(action)} [${yellow(topic)}] ${white(msg)}`);
console.warn(`${blueBright(action)} [${yellow(topic)}] ${white(msg)}`);
}
warn(message: any, context?: string): any {
Logger.warn(message, context);
}
}
7 changes: 4 additions & 3 deletions lib/common/meta-teg.discovery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@ import { Inject, Injectable } from '@nestjs/common';
import { ModulesContainer, Reflector } from '@nestjs/core';
import { MetadataScanner } from '@nestjs/core';
import { InstanceWrapper } from '@nestjs/core/injector/instance-wrapper';
import { MetaTegsMap } from '../interfaces';
import { TARGET_MODULE } from '../constants';
import { IMetaTegsMap } from '../interfaces';

@Injectable()
export class MetaTegsScannerService {
constructor(
private readonly metadataScanner: MetadataScanner,
private readonly reflector: Reflector,
private readonly modulesContainer: ModulesContainer,
@Inject('TARGET_MODULE') private readonly targetModuleName: string,
@Inject(TARGET_MODULE) private readonly targetModuleName: string,
) {}

public scan(metaTeg: string) {
Expand Down Expand Up @@ -44,7 +45,7 @@ export class MetaTegsScannerService {

private lookupMethods(
metaTeg: string,
rmqMessagesMap: MetaTegsMap,
rmqMessagesMap: IMetaTegsMap,
instance: object,
prototype: object,
methodName: string,
Expand Down
14 changes: 14 additions & 0 deletions lib/common/rmq-intercepter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { Message } from 'amqplib';
import { LoggerService } from '@nestjs/common';

export class RMQIntercepterClass {
protected logger: LoggerService;

constructor(logger: LoggerService = console) {
this.logger = logger;
}

async intercept(res: any, msg: Message, error?: Error): Promise<any> {
return res;
}
}
14 changes: 14 additions & 0 deletions lib/common/rmq-pipe.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { Message } from 'amqplib';
import { LoggerService } from '@nestjs/common';

export class RMQPipeClass {
protected logger: LoggerService;

constructor(logger: LoggerService = console) {
this.logger = logger;
}

async transform(msg: Message): Promise<Message> {
return msg;
}
}
11 changes: 11 additions & 0 deletions lib/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,14 @@ export const RMQ_CONNECT_OPTIONS = 'RMQ_CONNECT_OPTIONS';
export const RMQ_BROKER_OPTIONS = 'RMQ_BROKER_OPTIONS';
export const RMQ_MESSAGE_META_TEG = 'RMQ_MESSAGE_META_TEG';
export const RMQ_ROUTES_TRANSFORM = 'RMQ_ROUTES_TRANSFORM';
export const RMQ_APP_OPTIONS = 'RMQ_APP_OPTIONS';
export const TARGET_MODULE = 'TARGET_MODULE';

export const INITIALIZATION_STEP_DELAY = 400;
export const DEFAULT_TIMEOUT = 40000;

export const INDICATE_ERROR = 'Please indicate `replyToQueue';
export const TIMEOUT_ERROR = 'Response timeout error';
export const RECIVED_MESSAGE_ERROR = 'Received a message but with an error';
export const ERROR_RMQ_SERVICE = 'Rmq service error';
export const INOF_NOT_FULL_OPTIONS = 'Queue will not be created if there is no bind';
2 changes: 1 addition & 1 deletion lib/decorators/rmq-message.decorator.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { RMQ_MESSAGE_META_TEG } from 'lib/constants';
import { RMQ_MESSAGE_META_TEG } from '../constants';

export function RMQMessage(event: string) {
return function (target: any, propertyKey: string | symbol, descriptor: any) {
Expand Down
2 changes: 1 addition & 1 deletion lib/decorators/transform.decorator.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { applyDecorators, SetMetadata } from '@nestjs/common';
import { RMQ_ROUTES_TRANSFORM } from 'lib/constants';
import { RMQ_ROUTES_TRANSFORM } from '../constants';

export const RMQTransform = (): MethodDecorator => {
return applyDecorators(SetMetadata(RMQ_ROUTES_TRANSFORM, true));
Expand Down
4 changes: 2 additions & 2 deletions lib/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
export * from './rmq-nestjs.module';
export * from './rmq-nestjs.service';
export * from './rmq.module';
export * from './rmq.service';
11 changes: 11 additions & 0 deletions lib/interfaces/app-options.interface.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { LoggerService } from '@nestjs/common';
import { RMQIntercepterClass, RMQPipeClass } from '../common';

export interface IAppOptions {
logger?: LoggerService;
globalMiddleware?: (typeof RMQPipeClass)[];
globalIntercepters?: (typeof RMQIntercepterClass)[];
errorHandler?: object;
serviceName?: string;
logMessages: boolean;
}
2 changes: 1 addition & 1 deletion lib/interfaces/metategs.ts
Original file line number Diff line number Diff line change
@@ -1 +1 @@
export type MetaTegsMap = Map<string | symbol, (...args: any[]) => any>;
export type IMetaTegsMap = Map<string | symbol, (...args: any[]) => any>;
23 changes: 21 additions & 2 deletions lib/interfaces/rmq-options.interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,32 @@ export interface IRMQSRootAsyncOptions extends Pick<ModuleMetadata, 'imports'> {

export interface IMessageBroker {
exchange: IExchange;
queue?: IQueue;
replyTo: Options.AssertQueue;
queue?: IQueue;
messageTimeout?: number;
targetModuleName: string;
serviceName?: string;
}
export interface BindQueue {
export interface IBindQueue {
queue: string;
source: string;
pattern: string;
args?: Record<string, any>;
}

export interface ISendMessage {
exchange: string;
routingKey: string;
content: Record<string, any>;
options: Options.Publish;
}
export interface IPublishOptions extends Options.Publish {
timeout?: number;
}

export interface ISendToReplyQueueOptions {
replyTo: string;
content: Record<string, any>;
correlationId: string;
options?: Options.Publish;
}
6 changes: 5 additions & 1 deletion lib/interfaces/rmqService.ts
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
export interface ImqService {}
import { ConsumeMessage } from 'amqplib';

export interface ImqService {
readonly listenQueue: (msg: ConsumeMessage | null) => void;
}
68 changes: 64 additions & 4 deletions lib/rmq-nestjs-connect.service.ts → lib/rmq-connect.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,19 @@ import {
IExchange,
IQueue,
TypeQueue,
BindQueue,
IBindQueue,
ISendMessage,
ISendToReplyQueueOptions,
} from './interfaces';
import { Channel, Connection, Replies, connect } from 'amqplib';
import { Channel, Connection, ConsumeMessage, Replies, connect } from 'amqplib';

@Injectable()
export class RmqNestjsConnectService implements OnModuleInit, OnModuleDestroy {
private connection: Connection = null;
private baseChannel: Channel = null;
private replyToChannel: Channel = null;

private declared = false;

constructor(
@Inject(RMQ_CONNECT_OPTIONS) private readonly options: IRabbitMQConfig
) {}
Expand All @@ -47,6 +49,11 @@ export class RmqNestjsConnectService implements OnModuleInit, OnModuleDestroy {
);
}
}
public ack(
...params: Parameters<Channel['ack']>
): ReturnType<Channel['ack']> {
return this.baseChannel.ack(...params);
}
public async assertQueue(
typeQueue: TypeQueue,
options: IQueue
Expand All @@ -68,7 +75,7 @@ export class RmqNestjsConnectService implements OnModuleInit, OnModuleDestroy {
throw new Error(`Failed to assert ${typeQueue} queue: ${error.message}`);
}
}
async bindQueue(bindQueue: BindQueue): Promise<void> {
async bindQueue(bindQueue: IBindQueue): Promise<void> {
try {
await this.baseChannel.bindQueue(
bindQueue.queue,
Expand All @@ -82,6 +89,59 @@ export class RmqNestjsConnectService implements OnModuleInit, OnModuleDestroy {
);
}
}
async sendToReplyQueue(sendToQueueOptions: ISendToReplyQueueOptions) {
try {
this.replyToChannel.sendToQueue(
sendToQueueOptions.replyTo,
Buffer.from(JSON.stringify(sendToQueueOptions.content)),
{
correlationId: sendToQueueOptions.correlationId,
}
);
} catch (error) {
throw new Error(`Failed to send Reply Queue`);
}
}
async listerReplyQueue(
queue: string,
listenQueue: (msg: ConsumeMessage | null) => void
) {
try {
await this.replyToChannel.consume(queue, listenQueue, {
noAck: false,
});
} catch (error) {
throw new Error(`Failed to send listen Reply Queue`);
}
}
async listenQueue(
queue: string,
listenQueue: (msg: ConsumeMessage | null) => void
): Promise<void> {
try {
await this.baseChannel.consume(queue, listenQueue, {
noAck: false,
});
} catch (error) {
throw new Error(`Failed to listen Queue`);
}
}

publish(sendMessage: ISendMessage): void {
try {
this.baseChannel.publish(
sendMessage.exchange,
sendMessage.routingKey,
Buffer.from(JSON.stringify(sendMessage.content)),
{
replyTo: sendMessage.options.replyTo,
correlationId: sendMessage.options.correlationId,
}
);
} catch (error) {
throw new Error(`Failed to send message ${error}`);
}
}
private async setUpConnect(options: IRabbitMQConfig) {
const { username, password, hostname, port, virtualHost } = options;
const url = `amqp://${username}:${password}@${hostname}:${port}/${virtualHost}`;
Expand Down
21 changes: 15 additions & 6 deletions lib/rmq-nestjs-core.module.ts → lib/rmq-core.module.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,30 @@
import { DynamicModule, Module, Global } from '@nestjs/common';
import { RmqNestjsConnectService } from './rmq-nestjs-connect.service';
import { RMQ_CONNECT_OPTIONS } from './constants';
import { RMQ_APP_OPTIONS, RMQ_CONNECT_OPTIONS } from './constants';
import { IRMQSRootAsyncOptions, IRabbitMQConfig } from './interfaces';
import { RmqNestjsConnectService } from './rmq-connect.service';
import { IAppOptions } from './interfaces/app-options.interface';

@Global()
@Module({})
export class RmqNestjsCoreModule {
static forRoot(options: IRabbitMQConfig): DynamicModule {
static forRoot(
options: IRabbitMQConfig,
appOptions?: IAppOptions
): DynamicModule {
return {
module: RmqNestjsCoreModule,
providers: [
{ provide: RMQ_CONNECT_OPTIONS, useValue: options },
{ provide: RMQ_APP_OPTIONS, useValue: appOptions || {} },
RmqNestjsConnectService,
],
exports: [RmqNestjsConnectService],
exports: [RmqNestjsConnectService, RMQ_APP_OPTIONS],
};
}
static forRootAsync(options: IRMQSRootAsyncOptions): DynamicModule {
static forRootAsync(
options: IRMQSRootAsyncOptions,
appOptions?: IAppOptions
): DynamicModule {
return {
module: RmqNestjsCoreModule,
imports: options.imports,
Expand All @@ -29,9 +37,10 @@ export class RmqNestjsCoreModule {
},
inject: options.inject || [],
},
{ provide: RMQ_APP_OPTIONS, useValue: appOptions || {} },
RmqNestjsConnectService,
],
exports: [RmqNestjsConnectService],
exports: [RmqNestjsConnectService, RMQ_APP_OPTIONS],
};
}
}
40 changes: 0 additions & 40 deletions lib/rmq-nestjs.module.ts

This file was deleted.

Loading

0 comments on commit 44e4674

Please sign in to comment.