Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

revert change to not check in files #170

Merged
2 commits merged into from
Feb 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions .github/workflows/govulncheck.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,5 @@ jobs:
- uses: actions/checkout@v3
- name: Install
run: go install golang.org/x/vuln/cmd/govulncheck@latest
- name: Go generate
run: go generate ./...
- name: Run
run: govulncheck ./...
run: govulncheck ./...
2 changes: 0 additions & 2 deletions .github/workflows/lint.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ jobs:
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: |
${{ runner.os }}-go-
- name: Go generate
run: go generate ./...
- name: Run Go linter
uses: golangci/golangci-lint-action@v3
with:
Expand Down
2 changes: 0 additions & 2 deletions .github/workflows/staticcheck.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,5 @@ jobs:
${{ runner.os }}-go-
- name: Install
run: go install honnef.co/go/tools/cmd/staticcheck@latest
- name: Go generate
run: go generate ./...
- name: Run
run: staticcheck ./...
3 changes: 0 additions & 3 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@ jobs:
restore-keys: |
${{ runner.os }}-go-

- name: Go generate
run: go generate ./...

- name: Run test
run: go test -race -v -coverprofile=coverage.out ./...

Expand Down
11 changes: 0 additions & 11 deletions pkg/lang/javascript/aws_runtime/.gitignore
Original file line number Diff line number Diff line change
@@ -1,12 +1 @@
/node_modules/
dispatcher_fargate.js.tmpl
dispatcher_lambda.js.tmpl
emitter.js.tmpl
fs.js.tmpl
keyvalue.js.tmpl
orm.js.tmpl
proxy_eks.js.tmpl
proxy_fargate.js.tmpl
redis_cluster.js.tmpl
redis_node.js.tmpl
secret.js.tmpl
4 changes: 0 additions & 4 deletions pkg/lang/javascript/aws_runtime/compile_template.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,4 @@ do
mv _${var}.js ${var}.js.tmpl
ksed 's://TMPL ::g' ${var}.js.tmpl
echo "generated ${var}.js.tmpl"
echo ${var}.js.tmpl >> .gitignore
done

sort -u .gitignore > gitignore-tmp
mv gitignore-tmp .gitignore
40 changes: 40 additions & 0 deletions pkg/lang/javascript/aws_runtime/dispatcher_fargate.js.tmpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const express = require("express");
const path = require("path");
{{if .Expose.AppModule}}
require('../{{.Expose.AppModule}}');
{{end}}
{{if .MainModule}}
require('../{{.MainModule}}');
{{end}}
const app = express();
const port = 3001;
app.use(express.json());
app.get('/', (req, res) => {
res.sendStatus(200);
});
app.post('/', async (req, res) => {
const params = req.body;
console.info(`Dispatched:`, params);
try {
const mode = parseMode(params.callType);
switch (mode) {
case 'rpc':
const result = await require(path.join('../', params.moduleName))[params.functionToCall].apply(null, params.params);
res.send(result);
break;
}
}
catch (err) {
console.error(`Dispatcher Failed`, err);
throw err;
}
});
function parseMode(__callType) {
if (__callType === 'rpc')
return 'rpc';
}
app.listen(port, () => {
console.log(`Klotho RPC Proxy listening on: ${port}`);
});
124 changes: 124 additions & 0 deletions pkg/lang/javascript/aws_runtime/dispatcher_lambda.js.tmpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.addInflight = void 0;
const s3fs = require('./fs');
const serverless_express_1 = require("@vendia/serverless-express");
const uuid = require('uuid');
const _ = require('lodash');
const path = require('path');
{{if .Datadog}}
const { datadog } = require('datadog-lambda-js');
{{end}}
{{if .Lumigo}}
const lumigo = require('@lumigo/tracer')({ token: process.env['LUMIGO_KEY'] });
{{end}}
const inflight = new Set();
/**
* Use this to attach background processes that need to be awaited before the lambda exits.
* This is especially useful for cleanup tasks or to bridge between synchronous APIs.
*/
function addInflight(p) {
if (p != null && p != undefined) {
inflight.add(p);
p.then(() => {
inflight.delete(p);
});
}
}
exports.addInflight = addInflight;
async function lambdaHandler(event, context) {
console.info(`{{.ExecUnitName}} Dispatched`, event);
try {
let { __callType, __functionToCall, __moduleName, __params, path: __path } = event;
const mode = parseMode(event, __callType, __path);
if (!mode)
throw new Error(`Invalid Dispatcher Mode: ${mode}`);
const parameters = __params ? await s3fs.getCallParameters(__params, mode) : {};
if (!parameters)
throw new Error(`Runtime Error: Expected Parameters but got none`);
let response;
switch (mode) {
case 'webserver':
response = await webserverResponse(event, context);
break;
case 'emitter':
response = await activate_emitter(event);
break;
case 'rpc':
response = await handle_rpc_call(__functionToCall, __moduleName, parameters);
break;
case 'keepWarm':
break;
}
// await kvInterface.flushAlldMaps()
return response;
}
catch (err) {
console.error(`Dispatcher Failed`, err);
throw err;
}
finally {
try {
while (inflight.size > 0) {
const promises = Array.from(inflight);
inflight.clear();
console.info(`awaiting ${promises.length} promises before exiting`);
await Promise.all(promises);
}
}
catch (err) {
console.error('error waiting for inflight promises', err);
}
}
}
async function handle_rpc_call(__functionToCall, __moduleName, parameters) {
const result = await require(path.join('../', __moduleName))[__functionToCall].apply(null, parameters);
const payloadKey = uuid.v4();
await s3fs.saveParametersToS3(payloadKey, result);
return payloadKey;
}
async function activate_emitter(event) {
const p = [];
for (const record of event.Records) {
console.info('Processing record', JSON.stringify(record, null, 2));
const sns = record.Sns;
if (!sns)
continue;
const moduleName = sns.MessageAttributes.Path.Value;
const emitterName = sns.MessageAttributes.Name.Value;
const emitter = require(path.join('../', moduleName))[emitterName];
p.push(emitter.receive(record));
}
await Promise.all(p);
}
function parseMode(lambdaEvent, __callType, eventPathEntry) {
if (lambdaEvent.Records?.length > 0 && lambdaEvent.Records[0].Sns)
return 'emitter';
if (eventPathEntry)
return 'webserver';
if (__callType === 'rpc')
return 'rpc';
if (lambdaEvent[0] == 'warmed up')
return 'keepWarm';
}
async function webserverResponse(event, context) {
{{if and .Expose.AppModule .Expose.ExportedAppVar}}
const app = await require('../{{.Expose.AppModule}}')['{{.Expose.ExportedAppVar}}'];
return await (0, serverless_express_1.configure)({
app: app,
binarySettings: { contentTypes: ['application/octet-stream', 'image/*'] },
}).apply(null, [event, context]);
{{else}}
throw new Error('execution unit not configured to receive webserver payloads');
{{end}}
}
let handler = lambdaHandler;
{{if .Datadog}}
handler = datadog(handler);
{{end}}
{{if .Lumigo}}
if (process.env['LUMIGO_KEY']) {
handler = lumigo.trace(handler);
}
{{end}}
exports.handler = handler;
119 changes: 119 additions & 0 deletions pkg/lang/javascript/aws_runtime/emitter.js.tmpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.Emitter = void 0;
const events = require("events");
const client_sns_1 = require("@aws-sdk/client-sns");
const client_s3_1 = require("@aws-sdk/client-s3");
const uuid_1 = require("uuid");
const crypto = require("crypto");
// @ts-ignore
const dispatcher_1 = require("./dispatcher");
const payloadBucketPhysicalName = process.env.KLOTHO_S3_PREFIX + '{{.PayloadsBucketName}}';
const appName = '{{.AppName}}';
// The account-level ARN for sns. The topics must be account-wide unique
const { SNS_ARN_BASE } = process.env;
class Emitter extends events.EventEmitter {
constructor(path, name, id) {
super();
this.path = path;
this.name = name;
this.id = id;
this.client = new client_sns_1.SNSClient({});
this.s3 = new client_s3_1.S3Client({});
}
on(eventName, listener) {
// wrap the listener and add it to the inflight promises in case the listener is an async function
// otherwise a lambda will prematurely exist before the listener has run
super.on(eventName, (...args) => {
(0, dispatcher_1.addInflight)(listener(...args));
});
return this;
}
/**
* Must match the format used in deploylib
*/
topic(event) {
const topic = `${appName}_${this.id}_${event}`;
if (topic.length <= 256) {
return topic;
}
console.log('topic too long, hashing', { topic });
const hash = crypto.createHash('sha256');
hash.update(topic);
return `${hash.digest('hex')}_${event}`;
}
async save(event, ...args) {
const msgId = (0, uuid_1.v4)();
const key = `${this.path.replace(/[^0-9a-zA-Z_-]/, '-')}_${this.name}/${event}/${msgId}`;
await this.s3.send(new client_s3_1.PutObjectCommand({
Bucket: payloadBucketPhysicalName,
Key: key,
Body: JSON.stringify(args),
}));
return key;
}
async send(event, ...args) {
const topic = this.topic(event);
const arn = `${SNS_ARN_BASE}:${topic}`;
const payloadId = await this.save(event, ...args);
const resp = await this.client.send(new client_sns_1.PublishCommand({
TopicArn: arn,
Message: payloadId,
MessageAttributes: {
Path: {
DataType: 'String',
StringValue: this.path,
},
Name: {
DataType: 'String',
StringValue: this.name,
},
Event: {
DataType: 'String',
StringValue: event,
},
},
}));
console.info('Sent message', {
event,
topic,
arn,
payloadId,
messageId: resp.MessageId,
});
}
/**
* @param record see https://docs.aws.amazon.com/lambda/latest/dg/with-sns.html
*/
async receive(record) {
const { Message: payloadId, MessageAttributes: attribs } = record.Sns;
const eventName = attribs.Event.Value;
const obj = await this.s3.send(new client_s3_1.GetObjectCommand({
Bucket: payloadBucketPhysicalName,
Key: payloadId,
}));
if (!obj.Body)
return;
const argsStr = await streamToString(obj.Body);
// TODO - would be nice to keep these around for a little for debugging/auditing purposes.
const del = this.s3.send(new client_s3_1.DeleteObjectCommand({
Bucket: payloadBucketPhysicalName,
Key: payloadId,
}));
(0, dispatcher_1.addInflight)(del);
const args = JSON.parse(argsStr);
this.emit(eventName, ...args);
}
}
exports.Emitter = Emitter;
/**
* see /~https://github.com/aws/aws-sdk-js-v3/issues/1877#issuecomment-755446927
*/
async function streamToString(stream) {
return await new Promise((resolve, reject) => {
const chunks = [];
stream.on('data', (chunk) => chunks.push(chunk));
stream.on('error', reject);
stream.on('end', () => resolve(Buffer.concat(chunks).toString('utf-8')));
});
}
Loading