Skip to content
This repository has been archived by the owner on Apr 14, 2023. It is now read-only.

WIP: Continue work to support new graphql-js #133

Merged
merged 8 commits into from
May 18, 2017
5 changes: 3 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@
"url": "git+/~https://github.com/apollostack/subscriptions-transport-ws.git"
},
"dependencies": {
"@types/ws": "0.0.41",
"@types/ws": "^0.0.41",
"backo2": "^1.0.2",
"eventemitter3": "^2.0.3",
"graphql": "^0.9.3",
"graphql-subscriptions": "^0.3.1",
"graphql-tag": "^2.0.0",
"iterall": "^1.1.1",
"lodash.isobject": "^3.0.2",
"lodash.isstring": "^4.0.1",
"ws": "^3.0.0"
Expand All @@ -24,7 +25,7 @@
"pretest": "npm run compile",
"test": "npm run testonly --",
"posttest": "npm run lint",
"lint": "tslint --type-check --project ./tsconfig.json",
"lint": "tslint --format=stylish --type-check --project ./tsconfig.json",
"watch": "tsc -w",
"testonly": "mocha --reporter spec --full-trace ./dist/test/tests.js",
"coverage": "node ./node_modules/istanbul/lib/cli.js cover _mocha -- --full-trace ./dist/test/tests.js",
Expand Down
110 changes: 110 additions & 0 deletions src/adapters/subscription-manager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import { SubscriptionManager } from 'graphql-subscriptions';
import { print, DocumentNode, ExecutionResult, GraphQLSchema } from 'graphql';
import { isASubscriptionOperation } from '../utils/is-subscriptions';
import { ExecutionIterator } from '../server';
import { createRejectionIterable } from '../utils/rejection-iterable';
import { $$asyncIterator } from 'iterall';

export const executeFromSubscriptionManager = (subscriptionManager: SubscriptionManager) => {
return (schema: GraphQLSchema,
document: DocumentNode,
rootValue?: any,
contextValue?: any,
variableValues?: { [key: string]: any },
operationName?: string): ExecutionIterator => {
const pullQueue: any[] = [];
const pushQueue: any[] = [];
let listening = true;

const pushValue = (event: any) => {
if (pullQueue.length !== 0) {
const promise = pullQueue.shift();
promise.resolve({ value: event, done: false });
} else {
pushQueue.push(event);
}
};

const pushError = (error: Error) => {
if (pullQueue.length !== 0) {
const promise = pullQueue.shift();
promise.reject(error);
} else {
pushQueue.push(error);
}
};

const pullValue = () => {
return new Promise((resolve, reject) => {
if (pushQueue.length !== 0) {
const valueOrError = pushQueue.shift();

if (valueOrError instanceof Error) {
reject(valueOrError);
} else {
resolve({ value: valueOrError, done: false });
}
} else {
pullQueue.push({ resolve, reject });
}
});
};

const emptyQueue = () => {
if (listening) {
listening = false;
pullQueue.forEach(p => p.resolve({ value: undefined, done: true }));
pullQueue.length = 0;
pushQueue.length = 0;
}
};

if (!isASubscriptionOperation(document, operationName)) {
return createRejectionIterable(
new Error('GraphQL Query or Mutation are not supported using SubscriptionManager!'));
}

const callbackHandler = (error: Error, result: ExecutionResult) => {
if (error) {
pushError(error);
} else {
pushValue(result);
}
};

const subIdPromise = subscriptionManager.subscribe({
query: print(document),
operationName,
callback: callbackHandler,
variables: variableValues,
context: contextValue,
}).catch((e: Error) => pushError(e));

return {
next() {
return listening ? pullValue() : this.return();
},
return() {
emptyQueue();

if (subIdPromise) {
subIdPromise.then((opId: number) => {
if (opId) {
subscriptionManager.unsubscribe(opId);
}
});
}

return Promise.resolve({ value: undefined, done: true });
},
throw(error: Error) {
emptyQueue();

return Promise.reject(error);
},
[$$asyncIterator]() {
return this;
},
};
};
};
11 changes: 11 additions & 0 deletions src/legacy/defined-deprecation-function-wrapper.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
export const defineDeprecateFunctionWrapper = (deprecateMessage: string) => {
const wrapperFunction = () => {
if (process && process.env && process.env.NODE_ENV !== 'production') {
console.warn(deprecateMessage);
}
};

wrapperFunction();

return wrapperFunction;
};
64 changes: 64 additions & 0 deletions src/legacy/parse-legacy-protocol.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import { ConnectionContext } from '../server';
import MessageTypes from '../message-types';

export const parseLegacyProtocolMessage = (connectionContext: ConnectionContext, message: any) => {
let messageToReturn = message;

switch (message.type) {
case MessageTypes.INIT:
connectionContext.isLegacy = true;
messageToReturn = { ...message, type: MessageTypes.GQL_CONNECTION_INIT };
break;
case MessageTypes.SUBSCRIPTION_START:
messageToReturn = {
id: message.id,
type: MessageTypes.GQL_START,
payload: {
query: message.query,
operationName: message.operationName,
variables: message.variables,
},
};
break;
case MessageTypes.SUBSCRIPTION_END:
messageToReturn = { ...message, type: MessageTypes.GQL_STOP };
break;
case MessageTypes.GQL_CONNECTION_ACK:
if (connectionContext.isLegacy) {
messageToReturn = { ...message, type: MessageTypes.INIT_SUCCESS };
}
break;
case MessageTypes.GQL_CONNECTION_ERROR:
if (connectionContext.isLegacy) {
messageToReturn = {
...message, type: MessageTypes.INIT_FAIL,
payload: message.payload.message ? message.payload.message : message.payload,
};
}
break;
case MessageTypes.GQL_ERROR:
if (connectionContext.isLegacy) {
messageToReturn = { ...message, type: MessageTypes.SUBSCRIPTION_FAIL };
}
break;
case MessageTypes.GQL_DATA:
if (connectionContext.isLegacy) {
messageToReturn = { ...message, type: MessageTypes.SUBSCRIPTION_DATA };
}
break;
case MessageTypes.GQL_COMPLETE:
if (connectionContext.isLegacy) {
messageToReturn = null;
}
break;
case MessageTypes.SUBSCRIPTION_SUCCESS:
if (!connectionContext.isLegacy) {
messageToReturn = null;
}
break;
default:
break;
}

return messageToReturn;
};
Loading