Skip to content

Commit

Permalink
chore(ws-core): initial support for init packets
Browse files Browse the repository at this point in the history
  • Loading branch information
DxCx committed Feb 20, 2017
1 parent 0179f92 commit 4d14c95
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import { IObservable } from 'graphql-server-observable';

import {
RGQL_MSG_START,
RGQL_MSG_INIT,
RGQL_MSG_INIT_SUCCESS,
RGQL_MSG_DATA,
RGQL_MSG_STOP,
RGQL_MSG_ERROR,
Expand Down Expand Up @@ -164,6 +166,8 @@ describe('RequestsManager', () => {
},
},
},

// finite result, the server sends complete.
{
id: 1,
type: RGQL_MSG_COMPLETE,
Expand Down Expand Up @@ -193,7 +197,7 @@ describe('RequestsManager', () => {
}, 'Request has invalid type');
});

it('returns error if no id sent', () => {
it('returns error if no id sent for start', () => {
return expectError({
id: undefined,
type: RGQL_MSG_START,
Expand All @@ -203,6 +207,16 @@ describe('RequestsManager', () => {
}, 'Request is missing id field', false);
});

it('returns error if no id sent for stop', () => {
return expectError({
id: undefined,
type: RGQL_MSG_STOP,
payload: {
query: `query { testString }`,
},
}, 'Request is missing id field', false);
});

it('returns error if no type sent', () => {
return expectError({
id: undefined,
Expand Down Expand Up @@ -252,6 +266,31 @@ describe('RequestsManager', () => {
}, 'Variables are invalid JSON.');
});

it('supports init packet', () => {
const input = Observable.of({
data: {
type: RGQL_MSG_INIT,
payload: {},
},
});

const expected = [{
type: RGQL_MSG_INIT_SUCCESS,
}];

const reqMngr = new RequestsManager({
schema,
executor: graphqlRxjs,
}, input);

return wrapToRx(reqMngr.responseObservable)
.map((v) => v.data)
.bufferCount(expected.length + 1)
.toPromise().then((res) => {
expect(res).to.deep.equal(expected);
});
});

it('able to subscribe to changes', () => {
const input = Observable.from(<Observable<RGQLPacket>[]>[
Observable.of({
Expand Down Expand Up @@ -282,10 +321,6 @@ describe('RequestsManager', () => {
},
});
}
expected.push({
id: 1,
type: RGQL_MSG_COMPLETE,
});

const reqMngr = new RequestsManager({
schema,
Expand Down Expand Up @@ -331,10 +366,6 @@ describe('RequestsManager', () => {
},
});
}
expected.push({
id: 1,
type: RGQL_MSG_COMPLETE,
});

const reqMngr = new RequestsManager({
schema,
Expand Down Expand Up @@ -488,10 +519,6 @@ describe('RequestsManager', () => {
},
});
}
expected1.push({
id: 1,
type: RGQL_MSG_COMPLETE,
});

const expected2 = [];
for ( let i = 0 ; i < 4 ; i ++ ) {
Expand All @@ -505,10 +532,6 @@ describe('RequestsManager', () => {
},
});
}
expected2.push({
id: 2,
type: RGQL_MSG_COMPLETE,
});

const reqMngr = new RequestsManager({
schema,
Expand Down
96 changes: 65 additions & 31 deletions packages/graphql-server-reactive-protocol/src/RequestsManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import {
RGQL_MSG_DATA,
RGQL_MSG_START,
RGQL_MSG_STOP,
RGQL_MSG_INIT,
RGQL_MSG_INIT_SUCCESS,
RGQL_MSG_KEEPALIVE,
RGQLPacket,
RGQLPacketData,
Expand All @@ -27,7 +29,7 @@ import {

export class RequestsManager {
protected requests: { [key: number]: Subscription } = {};
protected orphanedResponds: Subscription[] = [];
protected orphanedResponses: Subscription[] = [];
protected _responseObservable: IObservable<RGQLPacket>;
protected executeReactive: RGQLExecuteFunction;

Expand All @@ -46,9 +48,11 @@ export class RequestsManager {
});

return () => {
/* istanbul ignore else */
if ( kaSub ) {
kaSub.unsubscribe();
}
/* istanbul ignore else */
if ( sub ) {
sub.unsubscribe();
}
Expand All @@ -66,7 +70,7 @@ export class RequestsManager {
}

protected _handleRequest(request: RGQLPacket, onMessageObserver: Observer<RGQLPacket>) {
this._subscribeResponds(this._executeRequest(request.data), request, onMessageObserver);
this._subscribeResponse(this._executeRequest(request.data), request, onMessageObserver);
}

protected _keepAliveObservable(): Observable<RGQLPacket> {
Expand All @@ -87,25 +91,38 @@ export class RequestsManager {
});
}

protected _executeRequest(request: RGQLPacketData): IObservable<ExecutionResult> {
const formatErrorFn = this.graphqlOptions.formatError || formatError;

protected _executeRequest(request: RGQLPacketData): IObservable<RGQLPacketData> {
try {
this._validateRequest(request);
} catch (e) {
return Observable.throw(e);
}

this._unsubscribe(request.id);

if ( request.type === RGQL_MSG_STOP ) {
return Observable.empty();
const key: number = request.id;
switch ( request.type ) {
case RGQL_MSG_STOP:
this._unsubscribe(key);
return Observable.empty();
case RGQL_MSG_INIT:
// TODO: Add callback support.
return Observable.of({ type: RGQL_MSG_INIT_SUCCESS });
case RGQL_MSG_START:
return this._flattenObservableData(this._executeStart(request),
request.id);
/* istanbul ignore next: invalid case. */
default:
return Observable.throw(new Error('FATAL ERROR: type was overritten since validation'));
}
}

protected _executeStart(request: RGQLPacketData): IObservable<ExecutionResult> {
const formatErrorFn = this.graphqlOptions.formatError || formatError;
const graphqlRequest: RGQLPayloadStart = request.payload;
const query = graphqlRequest.query;
const operationName = graphqlRequest.operationName;
let variables = graphqlRequest.variables;

this._unsubscribe(request.id);
if (typeof variables === 'string') {
try {
variables = JSON.parse(variables);
Expand Down Expand Up @@ -136,28 +153,28 @@ export class RequestsManager {
return runQueryReactive(params);
}

protected _subscribeResponds(obs: IObservable<ExecutionResult>, request: RGQLPacket, onMessageObserver: Observer<RGQLPacket>): void {
protected _subscribeResponse(obs: IObservable<RGQLPacketData>, request: RGQLPacket, onMessageObserver: Observer<RGQLPacket>): void {
const key: number = request.data.id;
const respondSubscription = this._prepareRespondPacket(obs, key, request.metadata).subscribe(onMessageObserver);
const responseSubscription = this._prepareResponseStream(obs, key, request.metadata).subscribe(onMessageObserver);

if ( key ) {
this.requests[key] = respondSubscription;
this.requests[key] = responseSubscription;
} else {
this.orphanedResponds.push(respondSubscription);
this.orphanedResponses.push(responseSubscription);
}
}

protected _validateRequest(packet: RGQLPacketData) {
if ( undefined === packet.id ) {
throw new Error('Request is missing id field');
}

if ( undefined === packet.type ) {
throw new Error('Request is missing type field');
}

switch ( packet.type ) {
case RGQL_MSG_START:
if ( undefined === packet.id ) {
throw new Error('Request is missing id field');
}

if ( undefined === packet.payload ) {
throw new Error('Request is missing payload field');
}
Expand All @@ -169,16 +186,23 @@ export class RequestsManager {
}
return;
case RGQL_MSG_STOP:
// Nothing much to check, no payload.
return;
if ( undefined === packet.id ) {
throw new Error('Request is missing id field');
}

// Nothing much to check, no payload.
return;
case RGQL_MSG_INIT:
// payload is optional.
return;
default:
throw new Error('Request has invalid type');
}
}

protected _prepareRespondPacket(obs: IObservable<ExecutionResult>, key: number, metadata?: Object): IObservable<RGQLPacket> {
protected _prepareResponseStream(obs: IObservable<ExecutionResult>, key: number, metadata?: Object): IObservable<RGQLPacket> {
return new Observable((observer) => {
return this._flattenObservable(obs, key).subscribe({
return this._flattenObservableErrors(obs, key).subscribe({
next: (data: RGQLPacketData) => {
// data => packet (data + metadata)
const nextData = {
Expand All @@ -189,12 +213,12 @@ export class RequestsManager {
observer.next(nextData);
},
error: observer.error,
complete: observer.complete,
complete: () => { /* noop */ },
});
});
}

protected _flattenObservable(obs: IObservable<ExecutionResult>, id?: number): IObservable<RGQLPacketData> {
protected _flattenObservableData(obs: IObservable<ExecutionResult>, id?: number): IObservable<RGQLPacketData> {
return new Observable((observer) => {
return obs.subscribe({
next: (data) => {
Expand All @@ -204,19 +228,29 @@ export class RequestsManager {
payload: data,
});
},
error: observer.error,
complete: () => {
observer.next({
id,
type: RGQL_MSG_COMPLETE,
});
},
});
});
}

protected _flattenObservableErrors(obs: IObservable<RGQLPacketData>, id?: number): IObservable<RGQLPacketData> {
return new Observable((observer) => {
return obs.subscribe({
next: observer.next,
error: (e) => {
observer.next({
...((undefined !== id) ? { id } : {}),
type: RGQL_MSG_ERROR,
payload: e,
});
},
complete: () => {
observer.next({
id,
type: RGQL_MSG_COMPLETE,
});
},
complete: observer.complete,
});
});
}
Expand All @@ -226,8 +260,8 @@ export class RequestsManager {
this._unsubscribe(parseInt(k, 10));
});

while ( this.orphanedResponds.length > 0 ) {
this.orphanedResponds.pop().unsubscribe();
while ( this.orphanedResponses.length > 0 ) {
this.orphanedResponses.pop().unsubscribe();
}
}

Expand Down
28 changes: 16 additions & 12 deletions packages/graphql-server-reactive-protocol/src/messageTypes.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
import { ExecutionResult } from 'graphql';

// Refer to /~https://github.com/apollographql/graphql-server/issues/272#issuecomment-278805955 for more information about protocol
export const RGQL_MSG_ERROR = 'error';
export const RGQL_MSG_COMPLETE = 'complete';
export const RGQL_MSG_DATA = 'data';
export const RGQL_MSG_START = 'start';
export const RGQL_MSG_STOP = 'stop';
export const RGQL_MSG_KEEPALIVE = 'keepalive';
export const RGQL_MSG_ERROR = 'error';
export const RGQL_MSG_COMPLETE = 'complete';
export const RGQL_MSG_DATA = 'data';
export const RGQL_MSG_START = 'start';
export const RGQL_MSG_STOP = 'stop';
export const RGQL_MSG_KEEPALIVE = 'keepalive';
export const RGQL_MSG_INIT = 'init';
export const RGQL_MSG_INIT_SUCCESS = 'init_success';
export type RGQLMessageType = (
'error' |
'complete' |
'data' |
'start' |
'stop' |
'keepalive'
'error' |
'complete' |
'data' |
'start' |
'stop' |
'keepalive' |
'init' |
'init_success'
);

export type RGQLPayloadError = Error;
Expand Down

0 comments on commit 4d14c95

Please sign in to comment.