Skip to content
This repository has been archived by the owner on Oct 25, 2023. It is now read-only.

feat: Add requests idempotency verification on the server side #400

Merged
merged 3 commits into from
Mar 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 134 additions & 0 deletions lib/express/idempotency.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
import log from './logger';
import LRU from 'lru-cache';
import { fs, util } from 'appium-support';
import os from 'os';
import path from 'path';
import { EventEmitter } from 'events';


const CACHE_SIZE = 1024;
const IDEMPOTENT_RESPONSES = new LRU({
max: CACHE_SIZE,
updateAgeOnGet: true,
dispose (key, {response}) {
if (response) {
fs.rimrafSync(response);
}
},
});
const MONITORED_METHODS = ['POST', 'PATCH'];
const IDEMPOTENCY_KEY_HEADER = 'x-idempotency-key';

process.on('exit', () => IDEMPOTENT_RESPONSES.reset());


function cacheResponse (key, req, res) {
const responseStateListener = new EventEmitter();
IDEMPOTENT_RESPONSES.set(key, {
method: req.method,
path: req.path,
response: null,
responseStateListener,
});
const tmpFile = path.resolve(os.tmpdir(), `${util.uuidV4()}.response`);
const responseListener = fs.createWriteStream(tmpFile, {
emitClose: true,
});
const originalSocketWriter = res.socket.write.bind(res.socket);
const patchedWriter = (chunk, encoding, next) => {
responseListener.write(chunk);
return originalSocketWriter(chunk, encoding, next);
};
res.socket.write = patchedWriter;
let writeError = null;
let isResponseFullySent = false;
responseListener.once('error', (e) => {
writeError = e;
});
res.once('finish', () => {
isResponseFullySent = true;
responseListener.end();
});
res.once('close', () => {
if (!isResponseFullySent) {
responseListener.end();
}
});
responseListener.once('close', () => {
if (!IDEMPOTENT_RESPONSES.has(key)) {
const msg = `Could not cache the response identified by '${key}'. ` +
`Cache consistency has been damaged`;
log.info(msg);
return responseStateListener.emit('error', new Error(msg));
}
if (writeError) {
log.info(`Could not cache the response identified by '${key}': ${writeError.message}`);
IDEMPOTENT_RESPONSES.del(key);
return responseStateListener.emit('error', writeError);
}
if (!isResponseFullySent) {
const msg = `Could not cache the response identified by '${key}', ` +
`because it has not been completed`;
log.info(msg);
log.info('Does the client terminate connections too early?');
IDEMPOTENT_RESPONSES.del(key);
return responseStateListener.emit('error', new Error(msg));
}

IDEMPOTENT_RESPONSES.get(key).response = tmpFile;
responseStateListener.emit('ready', tmpFile);
});
}

async function handleIdempotency (req, res, next) {
const key = req.headers[IDEMPOTENCY_KEY_HEADER];
if (!key) {
return next();
}
if (!MONITORED_METHODS.includes(req.method)) {
// GET, DELETE, etc. requests are idempotent by default
// there is no need to cache them
return next();
}

log.debug(`Request idempotency key: ${key}`);
if (!IDEMPOTENT_RESPONSES.has(key)) {
cacheResponse(key, req, res);
return next();
}

const {
method: storedMethod,
path: storedPath,
response,
responseStateListener,
} = IDEMPOTENT_RESPONSES.get(key);
if (req.method !== storedMethod || req.path !== storedPath) {
log.warn(`Got two different requests with the same idempotency key '${key}'`);
log.warn('Is the client generating idempotency keys properly?');
return next();
}

const rerouteCachedResponse = async (cachedResPath) => {
if (!await fs.exists(cachedResPath)) {
IDEMPOTENT_RESPONSES.del(key);
log.warn(`Could not read the cached response identified by key '${key}'`);
log.warn('The temporary storage is not accessible anymore');
return next();
}
fs.createReadStream(cachedResPath).pipe(res.socket);
};

if (response) {
log.info(`The same request with the idempotency key '${key}' has been already processed`);
log.info(`Rerouting its response to the current request`);
await rerouteCachedResponse(response);
} else {
log.info(`The same request with the idempotency key '${key}' is being processed`);
log.info(`Waiting for the response to be rerouted to the current request`);
responseStateListener.once('error', () => next());
responseStateListener.once('ready', rerouteCachedResponse);
}
}

export { handleIdempotency };
4 changes: 2 additions & 2 deletions lib/express/middleware.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import _ from 'lodash';
import log from './logger';
import { errors } from '../protocol';

import { handleIdempotency } from './idempotency';

function allowCrossDomain (req, res, next) {
try {
Expand Down Expand Up @@ -87,5 +87,5 @@ function catch404Handler (req, res) {
export {
allowCrossDomain, fixPythonContentType, defaultToJSONContentType,
catchAllHandler, catch404Handler, catch4XXHandler,
allowCrossDomainAsyncExecute,
allowCrossDomainAsyncExecute, handleIdempotency,
};
27 changes: 15 additions & 12 deletions lib/express/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,17 @@ import bodyParser from 'body-parser';
import methodOverride from 'method-override';
import log from './logger';
import { startLogFormatter, endLogFormatter } from './express-logging';
import { allowCrossDomain, fixPythonContentType, defaultToJSONContentType,
catchAllHandler, catch404Handler, catch4XXHandler,
allowCrossDomainAsyncExecute} from './middleware';
import {
allowCrossDomain, fixPythonContentType, defaultToJSONContentType,
catchAllHandler, catch404Handler, catch4XXHandler,
allowCrossDomainAsyncExecute, handleIdempotency,
} from './middleware';
import { guineaPig, guineaPigScrollable, guineaPigAppBanner, welcome, STATIC_DIR } from './static';
import { produceError, produceCrash } from './crash';
import { addWebSocketHandler, removeWebSocketHandler, removeAllWebSocketHandlers,
getWebSocketHandlers } from './websocket';
import {
addWebSocketHandler, removeWebSocketHandler, removeAllWebSocketHandlers,
getWebSocketHandlers
} from './websocket';
import B from 'bluebird';
import { DEFAULT_BASE_PATH } from '../protocol';

Expand Down Expand Up @@ -41,14 +45,12 @@ async function server (opts = {}) {
// http.Server.close() only stops new connections, but we need to wait until
// all connections are closed and the `close` event is emitted
const close = httpServer.close.bind(httpServer);
httpServer.close = async () => {
return await new B((resolve, reject) => {
httpServer.on('close', resolve);
close((err) => {
if (err) reject(err); // eslint-disable-line curly
});
httpServer.close = async () => await new B((resolve, reject) => {
httpServer.on('close', resolve);
close((err) => {
if (err) reject(err); // eslint-disable-line curly
});
};
});

return await new B((resolve, reject) => {
httpServer.on('error', (err) => {
Expand Down Expand Up @@ -123,6 +125,7 @@ function configureServer (app, routeConfiguringFunction, allowCors = true, baseP
} else {
app.use(allowCrossDomainAsyncExecute(basePath));
}
app.use(handleIdempotency);
app.use(fixPythonContentType(basePath));
app.use(defaultToJSONContentType);
app.use(bodyParser.urlencoded({extended: true}));
Expand Down
61 changes: 61 additions & 0 deletions test/basedriver/driver-e2e-tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,67 @@ function baseDriverE2ETests (DriverClass, defaultCaps = {}) {
}

describe('session handling', function () {
it('should handle idempotency while creating sessions', async function () {
const sessionIds = [];
let times = 0;
do {
const res = await request({
url: 'http://localhost:8181/wd/hub/session',
headers: {
'X-Idempotency-Key': '123456',
},
method: 'POST',
json: {desiredCapabilities: defaultCaps, requiredCapabilities: {}},
simple: false,
resolveWithFullResponse: true
});

sessionIds.push(res.body.sessionId);
times++;
} while (times < 2);
_.uniq(sessionIds).length.should.equal(1);

const res = await request({
url: `http://localhost:8181/wd/hub/session/${sessionIds[0]}`,
method: 'DELETE',
json: true,
simple: false,
resolveWithFullResponse: true
});
res.statusCode.should.equal(200);
res.body.status.should.equal(0);
});

it('should handle idempotency while creating parallel sessions', async function () {
const reqs = [];
let times = 0;
do {
reqs.push(request({
url: 'http://localhost:8181/wd/hub/session',
headers: {
'X-Idempotency-Key': '12345',
},
method: 'POST',
json: {desiredCapabilities: defaultCaps, requiredCapabilities: {}},
simple: false,
resolveWithFullResponse: true
}));
times++;
} while (times < 2);
const sessionIds = (await B.all(reqs)).map((x) => x.body.sessionId);
_.uniq(sessionIds).length.should.equal(1);

const res = await request({
url: `http://localhost:8181/wd/hub/session/${sessionIds[0]}`,
method: 'DELETE',
json: true,
simple: false,
resolveWithFullResponse: true
});
res.statusCode.should.equal(200);
res.body.status.should.equal(0);
});

it('should create session and retrieve a session id, then delete it', async function () {
let res = await request({
url: 'http://localhost:8181/wd/hub/session',
Expand Down
2 changes: 1 addition & 1 deletion test/express/server-specs.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ describe('server configuration', function () {
let app = {use: sinon.spy(), all: sinon.spy()};
let configureRoutes = () => {};
configureServer(app, configureRoutes);
app.use.callCount.should.equal(15);
app.use.callCount.should.equal(16);
app.all.callCount.should.equal(4);
});

Expand Down