Skip to content

Commit

Permalink
fix: properly handle promise rejections
Browse files Browse the repository at this point in the history
Related: #15
  • Loading branch information
darrachequesne committed Jan 10, 2024
1 parent f1b83f4 commit 075216f
Showing 1 changed file with 17 additions and 13 deletions.
30 changes: 17 additions & 13 deletions lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ const replaceBinaryObjectsByBuffers = (obj: any) => {
return obj;
};

function onPublishError(err: Error) {
debug("something went wrong when inserting the MongoDB document: %s", err);
}

/**
* Returns a function that will create a MongoAdapter instance.
*
Expand Down Expand Up @@ -217,12 +221,12 @@ export function createAdapter(

const defaultClose = adapter.close;

adapter.close = () => {
adapter.close = async () => {
adapters.delete(nsp.name);

if (adapters.size === 0) {
changeStream.removeAllListeners("close");
changeStream.close();
await changeStream.close();
// @ts-ignore
changeStream = null;
isClosed = true;
Expand Down Expand Up @@ -273,7 +277,7 @@ export class MongoAdapter extends Adapter {

this.publish({
type: EventType.INITIAL_HEARTBEAT,
});
}).catch(onPublishError);
}

close(): Promise<void> | void {
Expand Down Expand Up @@ -301,7 +305,7 @@ export class MongoAdapter extends Adapter {
case EventType.INITIAL_HEARTBEAT: {
this.publish({
type: EventType.HEARTBEAT,
});
}).catch(onPublishError);
break;
}
case EventType.BROADCAST: {
Expand Down Expand Up @@ -400,7 +404,7 @@ export class MongoAdapter extends Adapter {
data: socket.data,
})),
},
});
}).catch(onPublishError);
break;
}
case EventType.FETCH_SOCKETS_RESPONSE: {
Expand Down Expand Up @@ -477,7 +481,7 @@ export class MongoAdapter extends Adapter {
debug("sending heartbeat");
this.publish({
type: EventType.HEARTBEAT,
});
}).catch(onPublishError);
this.scheduleHeartbeat();
}, this.heartbeatInterval);
}
Expand Down Expand Up @@ -591,7 +595,7 @@ export class MongoAdapter extends Adapter {
requestId,
opts: MongoAdapter.serializeOptions(opts),
},
});
}).catch(onPublishError);

this.ackRequests.set(requestId, {
type: EventType.BROADCAST,
Expand Down Expand Up @@ -630,7 +634,7 @@ export class MongoAdapter extends Adapter {
opts: MongoAdapter.serializeOptions(opts),
rooms,
},
});
}).catch(onPublishError);
}

delSockets(opts: BroadcastOptions, rooms: Room[]) {
Expand All @@ -647,7 +651,7 @@ export class MongoAdapter extends Adapter {
opts: MongoAdapter.serializeOptions(opts),
rooms,
},
});
}).catch(onPublishError);
}

disconnectSockets(opts: BroadcastOptions, close: boolean) {
Expand All @@ -664,7 +668,7 @@ export class MongoAdapter extends Adapter {
opts: MongoAdapter.serializeOptions(opts),
close,
},
});
}).catch(onPublishError);
}

private getExpectedResponseCount() {
Expand Down Expand Up @@ -736,7 +740,7 @@ export class MongoAdapter extends Adapter {
data: {
packet,
},
});
}).catch(onPublishError);
}

private async serverSideEmitWithAck(packet: any[]) {
Expand Down Expand Up @@ -783,15 +787,15 @@ export class MongoAdapter extends Adapter {
requestId, // the presence of this attribute defines whether an acknowledgement is needed
packet,
},
});
}).catch(onPublishError);
}

override persistSession(session: any) {
debug("persisting session: %j", session);
this.publish({
type: EventType.SESSION,
data: session,
});
}).catch(onPublishError);
}

override async restoreSession(
Expand Down

0 comments on commit 075216f

Please sign in to comment.