Skip to content

Commit

Permalink
fix: Remove pumpify (#2029)
Browse files Browse the repository at this point in the history
* refactor: remove `pumpify`

* docs: lint and docs

* fix: uniformly check for `metadata` event before validation

* test: emulate expected `metadata` event

* fix: `gzip ? zlib.createGzip() : new PassThrough(),`

* docs: grammar
  • Loading branch information
d-goog authored Aug 12, 2022
1 parent a0ae017 commit edc1d64
Show file tree
Hide file tree
Showing 3 changed files with 236 additions and 174 deletions.
2 changes: 0 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@
"mime": "^3.0.0",
"mime-types": "^2.0.8",
"p-limit": "^3.0.1",
"pumpify": "^2.0.0",
"retry-request": "^5.0.0",
"teeny-request": "^8.0.0",
"uuid": "^8.0.0"
Expand All @@ -85,7 +84,6 @@
"@types/node": "^17.0.30",
"@types/node-fetch": "^2.1.3",
"@types/proxyquire": "^1.3.28",
"@types/pumpify": "^1.4.1",
"@types/request": "^2.48.4",
"@types/sinon": "^10.0.0",
"@types/tmp": "0.2.3",
Expand Down
246 changes: 149 additions & 97 deletions src/file.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,8 @@ import * as crypto from 'crypto';
import * as extend from 'extend';
import * as fs from 'fs';
import * as mime from 'mime';
// eslint-disable-next-line @typescript-eslint/no-var-requires
const pumpify = require('pumpify');
import * as resumableUpload from './resumable-upload';
import {Writable, Readable, PassThrough} from 'stream';
import {Writable, Readable, pipeline, Transform, PassThrough} from 'stream';
import * as zlib from 'zlib';
import * as http from 'http';

Expand Down Expand Up @@ -1495,7 +1493,7 @@ class File extends ServiceObject<File> {

const headers = rawResponseStream.toJSON().headers;
isServedCompressed = headers['content-encoding'] === 'gzip';
const throughStreams: Writable[] = [];
const transformStreams: Transform[] = [];

if (shouldRunValidation) {
// The x-goog-hash header should be set with a crc32c and md5 hash.
Expand All @@ -1517,28 +1515,32 @@ class File extends ServiceObject<File> {
crc32cGenerator: this.crc32cGenerator,
});

throughStreams.push(validateStream);
transformStreams.push(validateStream);
}

if (isServedCompressed && options.decompress) {
throughStreams.push(zlib.createGunzip());
transformStreams.push(zlib.createGunzip());
}

if (throughStreams.length === 1) {
rawResponseStream =
// eslint-disable-next-line @typescript-eslint/no-explicit-any
rawResponseStream.pipe(throughStreams[0]) as any;
} else if (throughStreams.length > 1) {
rawResponseStream = rawResponseStream.pipe(
pumpify.obj(throughStreams)
);
}
const handoffStream = new PassThrough({
final: async cb => {
// Preserving `onComplete`'s ability to
// close `throughStream` before pipeline
// attempts to.
await onComplete(null);
cb();
},
});

rawResponseStream
.on('error', onComplete)
.on('end', onComplete)
.pipe(throughStream, {end: false});
pipeline(
rawResponseStream,
...(transformStreams as [Transform]),
handoffStream,
throughStream,
onComplete
);
};

// This is hooked to the `complete` event from the request stream. This is
// our chance to validate the data and let the user know if anything went
// wrong.
Expand Down Expand Up @@ -1948,101 +1950,92 @@ class File extends ServiceObject<File> {
crc32c = false;
}

// Collect data as it comes in to store in a hash. This is compared to the
// checksum value on the returned metadata from the API.
const validateStream = new HashStreamValidator({
/**
* A callback for determining when the underlying pipeline is complete.
* It's possible the pipeline callback could error before the write stream
* calls `final` so by default this will destroy the write stream unless the
* write stream sets this callback via its `final` handler.
* @param error An optional error
*/
let pipelineCallback: (error?: Error | null) => void = error => {
writeStream.destroy(error || undefined);
};

// A stream for consumer to write to
const writeStream = new Writable({
final(cb) {
// Set the pipeline callback to this callback so the pipeline's results
// can be populated to the consumer
pipelineCallback = cb;

emitStream.end();
},
write(chunk, encoding, cb) {
emitStream.write(chunk, encoding, cb);
},
});

const emitStream = new PassThroughShim();
const hashCalculatingStream = new HashStreamValidator({
crc32c,
md5,
crc32cGenerator: this.crc32cGenerator,
});

const fileWriteStream = duplexify();

fileWriteStream.on('progress', evt => {
stream.emit('progress', evt);
});

const passThroughShim = new PassThroughShim();

passThroughShim.on('writing', () => {
stream.emit('writing');
let fileWriteStreamMetadataReceived = false;

// Handing off emitted events to users
emitStream.on('reading', () => writeStream.emit('reading'));
emitStream.on('writing', () => writeStream.emit('writing'));
fileWriteStream.on('progress', evt => writeStream.emit('progress', evt));
fileWriteStream.on('response', resp => writeStream.emit('response', resp));
fileWriteStream.once('metadata', () => {
fileWriteStreamMetadataReceived = true;
});

const stream = pumpify([
passThroughShim,
gzip ? zlib.createGzip() : new PassThrough(),
validateStream,
fileWriteStream,
]);

// Wait until we've received data to determine what upload technique to use.
stream.on('writing', () => {
writeStream.on('writing', () => {
if (options.resumable === false) {
this.startSimpleUpload_(fileWriteStream, options);
return;
}
this.startResumableUpload_(fileWriteStream, options);
});

fileWriteStream.on('response', stream.emit.bind(stream, 'response'));

// This is to preserve the `finish` event. We wait until the request stream
// emits "complete", as that is when we do validation of the data. After
// that is successful, we can allow the stream to naturally finish.
//
// Reference for tracking when we can use a non-hack solution:
// /~https://github.com/nodejs/node/pull/2314
fileWriteStream.on('prefinish', () => {
stream.cork();
});

// Compare our hashed version vs the completed upload's version.
fileWriteStream.on('complete', () => {
const metadata = this.metadata;

// If we're doing validation, assume the worst-- a data integrity
// mismatch. If not, these tests won't be performed, and we can assume the
// best.
let failed = crc32c || md5;

if (crc32c && metadata.crc32c) {
failed = !validateStream.test('crc32c', metadata.crc32c);
}

if (md5 && metadata.md5Hash) {
failed = !validateStream.test('md5', metadata.md5Hash);
} else {
this.startResumableUpload_(fileWriteStream, options);
}

if (failed) {
this.delete((err: ApiError) => {
let code;
let message;

if (err) {
code = 'FILE_NO_UPLOAD_DELETE';
message = `${FileExceptionMessages.UPLOAD_MISMATCH_DELETE_FAIL}${err.message}`;
} else if (md5 && !metadata.md5Hash) {
code = 'MD5_NOT_AVAILABLE';
message = FileExceptionMessages.MD5_NOT_AVAILABLE;
} else {
code = 'FILE_NO_UPLOAD';
message = FileExceptionMessages.UPLOAD_MISMATCH;
pipeline(
emitStream,
gzip ? zlib.createGzip() : new PassThrough(),
hashCalculatingStream,
fileWriteStream,
async e => {
if (e) {
return pipelineCallback(e);
}

const error = new RequestError(message);
error.code = code;
error.errors = [err!];

fileWriteStream.destroy(error);
});

return;
}
// We want to make sure we've received the metadata from the server in order
// to properly validate the object's integrity. Depending on the type of upload,
// the stream could close before the response is returned.
if (!fileWriteStreamMetadataReceived) {
try {
await new Promise((resolve, reject) => {
fileWriteStream.once('metadata', resolve);
fileWriteStream.once('error', reject);
});
} catch (e) {
return pipelineCallback(e as Error);
}
}

stream.uncork();
try {
await this.#validateIntegrity(hashCalculatingStream, {crc32c, md5});
pipelineCallback();
} catch (e) {
pipelineCallback(e as Error);
}
}
);
});

return stream as Writable;
return writeStream;
}

/**
Expand Down Expand Up @@ -3932,6 +3925,7 @@ class File extends ServiceObject<File> {
})
.on('metadata', metadata => {
this.metadata = metadata;
dup.emit('metadata');
})
.on('finish', () => {
dup.emit('complete');
Expand Down Expand Up @@ -4011,6 +4005,7 @@ class File extends ServiceObject<File> {
}

this.metadata = body;
dup.emit('metadata', body);
dup.emit('response', resp);
dup.emit('complete');
});
Expand Down Expand Up @@ -4049,6 +4044,63 @@ class File extends ServiceObject<File> {

return Buffer.concat(buf);
}

/**
*
* @param hashCalculatingStream
* @param verify
* @returns {boolean} Returns `true` if valid, throws with error otherwise
*/
async #validateIntegrity(
hashCalculatingStream: HashStreamValidator,
verify: {crc32c?: boolean; md5?: boolean} = {}
) {
const metadata = this.metadata;

// If we're doing validation, assume the worst
let dataMismatch = !!(verify.crc32c || verify.md5);

if (verify.crc32c && metadata.crc32c) {
dataMismatch = !hashCalculatingStream.test('crc32c', metadata.crc32c);
}

if (verify.md5 && metadata.md5Hash) {
dataMismatch = !hashCalculatingStream.test('md5', metadata.md5Hash);
}

if (dataMismatch) {
const errors: Error[] = [];
let code = '';
let message = '';

try {
await this.delete();

if (verify.md5 && !metadata.md5Hash) {
code = 'MD5_NOT_AVAILABLE';
message = FileExceptionMessages.MD5_NOT_AVAILABLE;
} else {
code = 'FILE_NO_UPLOAD';
message = FileExceptionMessages.UPLOAD_MISMATCH;
}
} catch (e) {
const error = e as Error;

code = 'FILE_NO_UPLOAD_DELETE';
message = `${FileExceptionMessages.UPLOAD_MISMATCH_DELETE_FAIL}${error.message}`;

errors.push(error);
}

const error = new RequestError(message);
error.code = code;
error.errors = errors;

throw error;
}

return true;
}
}

/*! Developer Documentation
Expand Down
Loading

0 comments on commit edc1d64

Please sign in to comment.