-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathingest.js
146 lines (136 loc) · 4.75 KB
/
ingest.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
const async = require('async');
const batch = require('./batch');
function generateBody(size) {
const randomString = Math.random().toString(36).slice(2);
return new Array(Math.ceil(size / randomString.length))
.fill(randomString)
.join('')
.slice(0, size);
}
function ingest(options, cb) {
if (!options.prefix) {
options.prefix = `test-${new Date().toISOString().replace(/[:.]/g, '-')}/`;
}
const obj = batch.create(options);
batch.showOptions(obj);
console.log(`
one object: ${options.oneObject ? 'yes' : 'no'}
del after put: ${options.deleteAfterPut ? 'yes' : 'no'}
add tags: ${options.addTags ? 'yes' : 'no'}
MPU parts: ${options.mpuParts ? options.mpuParts : 'N/A'}
`);
const putObject = (s3, bucket, key, body, tags, cb) => s3.putObject({
Bucket: bucket,
Key: key,
Body: body,
Tagging: tags,
}, cb);
const putMPU = (s3, bucket, key, body, tags, cb) => async.waterfall([
next => s3.createMultipartUpload({
Bucket: bucket,
Key: key,
}, (err, data) => {
if (err) {
console.error(`error during createMultipartUpload for ${bucket}/${key}:`, err.message);
return next(err);
}
return next(null, data.UploadId);
}),
(uploadId, next) => async.timesLimit(options.mpuParts, 4, (n, partDone) => s3.uploadPart({
Bucket: bucket,
Key: key,
UploadId: uploadId,
PartNumber: n + 1,
Body: body[n],
}, (err, partData) => {
if (err) {
console.error(`error during upload part for ${options.bucket}/${key}:`,
err.message);
return partDone(err);
}
const partInfo = {
PartNumber: n + 1,
ETag: partData.ETag,
};
return partDone(null, partInfo);
}), (err, partsInfo) => next(err, uploadId, partsInfo)),
(uploadId, partsInfo, next) => {
let repeat = 1;
if (options.mpuFuzzRepeatCompleteProb) {
while (Math.random() < options.mpuFuzzRepeatCompleteProb) {
repeat += 1;
}
}
async.times(repeat, (i, completeDone) => s3.completeMultipartUpload({
Bucket: bucket,
Key: key,
UploadId: uploadId,
MultipartUpload: {
Parts: partsInfo,
},
}, completeDone), next);
},
], err => {
if (err) {
console.error(`error during completeMultipartUpload for ${options.bucket}/${key}:`,
err.message);
}
return cb(err);
});
let body;
let putFunc;
if (options.mpuParts) {
const partSize = Math.ceil(options.size / options.mpuParts);
body = [];
let remainingSize = options.size;
while (remainingSize > 0) {
body.push(generateBody(Math.min(partSize, remainingSize)));
remainingSize -= partSize;
}
putFunc = putMPU;
} else {
body = generateBody(options.size);
putFunc = putObject;
}
const ingestOp = (s3, n, endSuccess, endError) => {
batch.getKey(obj, n, key => {
let tags = '';
if (options.addTags) {
const nTags = Math.floor(Math.random() * 50);
const tagSet = [];
for (let i = 1; i <= nTags; ++i) {
tagSet.push(`TagKey${i}=VeryVeryVeryVeryVeryVeryVeryVeryVeryVeryLongTagValue${i}`);
}
tags = tagSet.join('&');
}
putFunc(s3, options.bucket, key, body, tags, err => {
if (err) {
console.error(`error during "PUT ${options.bucket}/${key}":`,
err.message);
return endError();
}
if (!options.deleteAfterPut) {
return endSuccess();
}
return s3.deleteObject({
Bucket: options.bucket,
Key: key,
}, err => {
if (err) {
console.error(`error during "DELETE ${options.bucket}/${key}":`,
err.message);
return endError();
}
return endSuccess();
});
});
});
};
batch.init(obj, err => {
if (err) {
return cb(err);
}
return batch.run(obj, ingestOp, cb);
});
}
module.exports = ingest;