diff --git a/.github/workflows/govulncheck.yaml b/.github/workflows/govulncheck.yaml index fb938daab..3cdd851d3 100644 --- a/.github/workflows/govulncheck.yaml +++ b/.github/workflows/govulncheck.yaml @@ -27,7 +27,5 @@ jobs: - uses: actions/checkout@v3 - name: Install run: go install golang.org/x/vuln/cmd/govulncheck@latest - - name: Go generate - run: go generate ./... - name: Run - run: govulncheck ./... + run: govulncheck ./... \ No newline at end of file diff --git a/.github/workflows/lint.yaml b/.github/workflows/lint.yaml index a8412e8f2..1bb60dd61 100644 --- a/.github/workflows/lint.yaml +++ b/.github/workflows/lint.yaml @@ -25,8 +25,6 @@ jobs: key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }} restore-keys: | ${{ runner.os }}-go- - - name: Go generate - run: go generate ./... - name: Run Go linter uses: golangci/golangci-lint-action@v3 with: diff --git a/.github/workflows/staticcheck.yaml b/.github/workflows/staticcheck.yaml index 20d7c0ad5..257ae2df5 100644 --- a/.github/workflows/staticcheck.yaml +++ b/.github/workflows/staticcheck.yaml @@ -27,7 +27,5 @@ jobs: ${{ runner.os }}-go- - name: Install run: go install honnef.co/go/tools/cmd/staticcheck@latest - - name: Go generate - run: go generate ./... - name: Run run: staticcheck ./... diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index f7517ba32..294b21b4d 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -26,9 +26,6 @@ jobs: restore-keys: | ${{ runner.os }}-go- - - name: Go generate - run: go generate ./... - - name: Run test run: go test -race -v -coverprofile=coverage.out ./... diff --git a/pkg/lang/javascript/aws_runtime/.gitignore b/pkg/lang/javascript/aws_runtime/.gitignore index 6667e494f..2ccbe4656 100644 --- a/pkg/lang/javascript/aws_runtime/.gitignore +++ b/pkg/lang/javascript/aws_runtime/.gitignore @@ -1,12 +1 @@ /node_modules/ -dispatcher_fargate.js.tmpl -dispatcher_lambda.js.tmpl -emitter.js.tmpl -fs.js.tmpl -keyvalue.js.tmpl -orm.js.tmpl -proxy_eks.js.tmpl -proxy_fargate.js.tmpl -redis_cluster.js.tmpl -redis_node.js.tmpl -secret.js.tmpl diff --git a/pkg/lang/javascript/aws_runtime/compile_template.sh b/pkg/lang/javascript/aws_runtime/compile_template.sh index f1a3bf98c..5caabdf31 100755 --- a/pkg/lang/javascript/aws_runtime/compile_template.sh +++ b/pkg/lang/javascript/aws_runtime/compile_template.sh @@ -16,8 +16,4 @@ do mv _${var}.js ${var}.js.tmpl ksed 's://TMPL ::g' ${var}.js.tmpl echo "generated ${var}.js.tmpl" - echo ${var}.js.tmpl >> .gitignore done - -sort -u .gitignore > gitignore-tmp -mv gitignore-tmp .gitignore diff --git a/pkg/lang/javascript/aws_runtime/dispatcher_fargate.js.tmpl b/pkg/lang/javascript/aws_runtime/dispatcher_fargate.js.tmpl new file mode 100644 index 000000000..0ee60c447 --- /dev/null +++ b/pkg/lang/javascript/aws_runtime/dispatcher_fargate.js.tmpl @@ -0,0 +1,40 @@ +"use strict"; +Object.defineProperty(exports, "__esModule", { value: true }); +const express = require("express"); +const path = require("path"); +{{if .Expose.AppModule}} +require('../{{.Expose.AppModule}}'); +{{end}} +{{if .MainModule}} +require('../{{.MainModule}}'); +{{end}} +const app = express(); +const port = 3001; +app.use(express.json()); +app.get('/', (req, res) => { + res.sendStatus(200); +}); +app.post('/', async (req, res) => { + const params = req.body; + console.info(`Dispatched:`, params); + try { + const mode = parseMode(params.callType); + switch (mode) { + case 'rpc': + const result = await require(path.join('../', params.moduleName))[params.functionToCall].apply(null, params.params); + res.send(result); + break; + } + } + catch (err) { + console.error(`Dispatcher Failed`, err); + throw err; + } +}); +function parseMode(__callType) { + if (__callType === 'rpc') + return 'rpc'; +} +app.listen(port, () => { + console.log(`Klotho RPC Proxy listening on: ${port}`); +}); diff --git a/pkg/lang/javascript/aws_runtime/dispatcher_lambda.js.tmpl b/pkg/lang/javascript/aws_runtime/dispatcher_lambda.js.tmpl new file mode 100644 index 000000000..dbff6c0bf --- /dev/null +++ b/pkg/lang/javascript/aws_runtime/dispatcher_lambda.js.tmpl @@ -0,0 +1,124 @@ +"use strict"; +Object.defineProperty(exports, "__esModule", { value: true }); +exports.addInflight = void 0; +const s3fs = require('./fs'); +const serverless_express_1 = require("@vendia/serverless-express"); +const uuid = require('uuid'); +const _ = require('lodash'); +const path = require('path'); +{{if .Datadog}} +const { datadog } = require('datadog-lambda-js'); +{{end}} +{{if .Lumigo}} +const lumigo = require('@lumigo/tracer')({ token: process.env['LUMIGO_KEY'] }); +{{end}} +const inflight = new Set(); +/** + * Use this to attach background processes that need to be awaited before the lambda exits. + * This is especially useful for cleanup tasks or to bridge between synchronous APIs. + */ +function addInflight(p) { + if (p != null && p != undefined) { + inflight.add(p); + p.then(() => { + inflight.delete(p); + }); + } +} +exports.addInflight = addInflight; +async function lambdaHandler(event, context) { + console.info(`{{.ExecUnitName}} Dispatched`, event); + try { + let { __callType, __functionToCall, __moduleName, __params, path: __path } = event; + const mode = parseMode(event, __callType, __path); + if (!mode) + throw new Error(`Invalid Dispatcher Mode: ${mode}`); + const parameters = __params ? await s3fs.getCallParameters(__params, mode) : {}; + if (!parameters) + throw new Error(`Runtime Error: Expected Parameters but got none`); + let response; + switch (mode) { + case 'webserver': + response = await webserverResponse(event, context); + break; + case 'emitter': + response = await activate_emitter(event); + break; + case 'rpc': + response = await handle_rpc_call(__functionToCall, __moduleName, parameters); + break; + case 'keepWarm': + break; + } + // await kvInterface.flushAlldMaps() + return response; + } + catch (err) { + console.error(`Dispatcher Failed`, err); + throw err; + } + finally { + try { + while (inflight.size > 0) { + const promises = Array.from(inflight); + inflight.clear(); + console.info(`awaiting ${promises.length} promises before exiting`); + await Promise.all(promises); + } + } + catch (err) { + console.error('error waiting for inflight promises', err); + } + } +} +async function handle_rpc_call(__functionToCall, __moduleName, parameters) { + const result = await require(path.join('../', __moduleName))[__functionToCall].apply(null, parameters); + const payloadKey = uuid.v4(); + await s3fs.saveParametersToS3(payloadKey, result); + return payloadKey; +} +async function activate_emitter(event) { + const p = []; + for (const record of event.Records) { + console.info('Processing record', JSON.stringify(record, null, 2)); + const sns = record.Sns; + if (!sns) + continue; + const moduleName = sns.MessageAttributes.Path.Value; + const emitterName = sns.MessageAttributes.Name.Value; + const emitter = require(path.join('../', moduleName))[emitterName]; + p.push(emitter.receive(record)); + } + await Promise.all(p); +} +function parseMode(lambdaEvent, __callType, eventPathEntry) { + if (lambdaEvent.Records?.length > 0 && lambdaEvent.Records[0].Sns) + return 'emitter'; + if (eventPathEntry) + return 'webserver'; + if (__callType === 'rpc') + return 'rpc'; + if (lambdaEvent[0] == 'warmed up') + return 'keepWarm'; +} +async function webserverResponse(event, context) { + {{if and .Expose.AppModule .Expose.ExportedAppVar}} + const app = await require('../{{.Expose.AppModule}}')['{{.Expose.ExportedAppVar}}']; + return await (0, serverless_express_1.configure)({ + app: app, + binarySettings: { contentTypes: ['application/octet-stream', 'image/*'] }, + }).apply(null, [event, context]); + {{else}} + throw new Error('execution unit not configured to receive webserver payloads'); + {{end}} +} +let handler = lambdaHandler; +{{if .Datadog}} +handler = datadog(handler); +{{end}} +{{if .Lumigo}} +if (process.env['LUMIGO_KEY']) { + handler = lumigo.trace(handler); +} +{{end}} +exports.handler = handler; diff --git a/pkg/lang/javascript/aws_runtime/emitter.js.tmpl b/pkg/lang/javascript/aws_runtime/emitter.js.tmpl new file mode 100644 index 000000000..f0dbff3e7 --- /dev/null +++ b/pkg/lang/javascript/aws_runtime/emitter.js.tmpl @@ -0,0 +1,119 @@ +"use strict"; +Object.defineProperty(exports, "__esModule", { value: true }); +exports.Emitter = void 0; +const events = require("events"); +const client_sns_1 = require("@aws-sdk/client-sns"); +const client_s3_1 = require("@aws-sdk/client-s3"); +const uuid_1 = require("uuid"); +const crypto = require("crypto"); +// @ts-ignore +const dispatcher_1 = require("./dispatcher"); +const payloadBucketPhysicalName = process.env.KLOTHO_S3_PREFIX + '{{.PayloadsBucketName}}'; +const appName = '{{.AppName}}'; +// The account-level ARN for sns. The topics must be account-wide unique +const { SNS_ARN_BASE } = process.env; +class Emitter extends events.EventEmitter { + constructor(path, name, id) { + super(); + this.path = path; + this.name = name; + this.id = id; + this.client = new client_sns_1.SNSClient({}); + this.s3 = new client_s3_1.S3Client({}); + } + on(eventName, listener) { + // wrap the listener and add it to the inflight promises in case the listener is an async function + // otherwise a lambda will prematurely exist before the listener has run + super.on(eventName, (...args) => { + (0, dispatcher_1.addInflight)(listener(...args)); + }); + return this; + } + /** + * Must match the format used in deploylib + */ + topic(event) { + const topic = `${appName}_${this.id}_${event}`; + if (topic.length <= 256) { + return topic; + } + console.log('topic too long, hashing', { topic }); + const hash = crypto.createHash('sha256'); + hash.update(topic); + return `${hash.digest('hex')}_${event}`; + } + async save(event, ...args) { + const msgId = (0, uuid_1.v4)(); + const key = `${this.path.replace(/[^0-9a-zA-Z_-]/, '-')}_${this.name}/${event}/${msgId}`; + await this.s3.send(new client_s3_1.PutObjectCommand({ + Bucket: payloadBucketPhysicalName, + Key: key, + Body: JSON.stringify(args), + })); + return key; + } + async send(event, ...args) { + const topic = this.topic(event); + const arn = `${SNS_ARN_BASE}:${topic}`; + const payloadId = await this.save(event, ...args); + const resp = await this.client.send(new client_sns_1.PublishCommand({ + TopicArn: arn, + Message: payloadId, + MessageAttributes: { + Path: { + DataType: 'String', + StringValue: this.path, + }, + Name: { + DataType: 'String', + StringValue: this.name, + }, + Event: { + DataType: 'String', + StringValue: event, + }, + }, + })); + console.info('Sent message', { + event, + topic, + arn, + payloadId, + messageId: resp.MessageId, + }); + } + /** + * @param record see https://docs.aws.amazon.com/lambda/latest/dg/with-sns.html + */ + async receive(record) { + const { Message: payloadId, MessageAttributes: attribs } = record.Sns; + const eventName = attribs.Event.Value; + const obj = await this.s3.send(new client_s3_1.GetObjectCommand({ + Bucket: payloadBucketPhysicalName, + Key: payloadId, + })); + if (!obj.Body) + return; + const argsStr = await streamToString(obj.Body); + // TODO - would be nice to keep these around for a little for debugging/auditing purposes. + const del = this.s3.send(new client_s3_1.DeleteObjectCommand({ + Bucket: payloadBucketPhysicalName, + Key: payloadId, + })); + (0, dispatcher_1.addInflight)(del); + const args = JSON.parse(argsStr); + this.emit(eventName, ...args); + } +} +exports.Emitter = Emitter; +/** + * see /~https://github.com/aws/aws-sdk-js-v3/issues/1877#issuecomment-755446927 + */ +async function streamToString(stream) { + return await new Promise((resolve, reject) => { + const chunks = []; + stream.on('data', (chunk) => chunks.push(chunk)); + stream.on('error', reject); + stream.on('end', () => resolve(Buffer.concat(chunks).toString('utf-8'))); + }); +} diff --git a/pkg/lang/javascript/aws_runtime/fs.js.tmpl b/pkg/lang/javascript/aws_runtime/fs.js.tmpl new file mode 100644 index 000000000..d8127bdd0 --- /dev/null +++ b/pkg/lang/javascript/aws_runtime/fs.js.tmpl @@ -0,0 +1,155 @@ +"use strict"; +Object.defineProperty(exports, "__esModule", { value: true }); +const _ = require('lodash'); +const path = require('path'); +const client_s3_1 = require("@aws-sdk/client-s3"); +const payloadBucketPhysicalName = process.env.KLOTHO_S3_PREFIX + '{{.PayloadsBucketName}}'; +const targetRegion = process.env['AWS_TARGET_REGION']; +const userBucketPath = '/files'; +const s3Client = new client_s3_1.S3Client({ region: targetRegion }); +const streamToString = (stream) => new Promise((resolve, reject) => { + const chunks = []; + stream.on('data', (chunk) => chunks.push(chunk)); + stream.on('error', reject); + stream.on('end', () => resolve(Buffer.concat(chunks).toString('utf8'))); +}); +async function getCallParameters(paramKey, dispatcherMode) { + let isEmitter = dispatcherMode === 'emitter' ? true : false; + try { + const bucketParams = { + Bucket: payloadBucketPhysicalName, + Key: paramKey, + }; + const result = await s3Client.send(new client_s3_1.GetObjectCommand(bucketParams)); + let parameters = ''; + if (result.Body) { + parameters = await streamToString(result.Body); + console.log(parameters); + } + if (parameters != '') { + parameters = JSON.parse(parameters); + } + console.log(parameters); + if (isEmitter && Array.isArray(parameters)) { + // Emitters only have 1 parameter - the runtime saves an array, so we + // normalize the parameter + parameters = parameters[0]; + if (Array.isArray(parameters)) { + let paramPairs = Object.entries(parameters); + paramPairs = paramPairs.map((x) => { + if (x[1].type == 'Buffer') { + return [x[0], x[1].data]; + } + else { + return x; + } + }); + parameters = Object.entries(paramPairs); + } + } + return parameters || {}; + } + catch (e) { + console.error(e); + throw e; + } +} +exports.getCallParameters = getCallParameters; +async function saveParametersToS3(paramsS3Key, params) { + try { + const bucketParams = { + Bucket: payloadBucketPhysicalName, + Key: paramsS3Key, + Body: JSON.stringify(params), + }; + await s3Client.send(new client_s3_1.PutObjectCommand(bucketParams)); + } + catch (e) { + console.error(e); + throw e; + } +} +exports.saveParametersToS3 = saveParametersToS3; +async function s3_writeFile(...args) { + const bucketParams = { + Bucket: payloadBucketPhysicalName, + Key: `${userBucketPath}/${args[0]}`, + Body: args[1], + }; + try { + await s3Client.send(new client_s3_1.PutObjectCommand(bucketParams)); + console.debug('Successfully uploaded object: ' + bucketParams.Bucket + '/' + bucketParams.Key); + } + catch (err) { + console.log('Error', err); + throw err; + } +} +async function s3_readFile(...args) { + const bucketParams = { + Bucket: payloadBucketPhysicalName, + Key: `${userBucketPath}/${args[0]}`, + }; + try { + // Get the object from the Amazon S3 bucket. It is returned as a ReadableStream. + const data = await s3Client.send(new client_s3_1.GetObjectCommand(bucketParams)); + if (data.Body) { + return await streamToString(data.Body); + } + return ''; + } + catch (err) { + console.log('Error', err); + throw err; + } +} +async function s3_readdir(path) { + const bucketParams = { + Bucket: payloadBucketPhysicalName, + Prefix: `${userBucketPath}/${path}`, + }; + try { + const data = await s3Client.send(new client_s3_1.ListObjectsCommand(bucketParams)); + if (data.Contents) { + const objectKeys = data.Contents.map((c) => c.Key); + console.debug('Success', objectKeys); + return objectKeys; + } + } + catch (err) { + console.log('Error', err); + throw err; + } +} +async function s3_exists(fpath) { + const bucketParams = { Bucket: payloadBucketPhysicalName, Key: `${userBucketPath}/${path}` }; + try { + const data = await s3Client.send(new client_s3_1.HeadObjectCommand(bucketParams)); + console.debug('Success. Object deleted.', data); + return data; // For unit tests. + } + catch (err) { + console.log('Error', err); + throw err; + } +} +async function s3_deleteFile(fpath) { + const bucketParams = { Bucket: payloadBucketPhysicalName, Key: `${userBucketPath}/${path}` }; + try { + const data = await s3Client.send(new client_s3_1.DeleteObjectCommand(bucketParams)); + console.debug('Success. Object deleted.', data); + return data; // For unit tests. + } + catch (err) { + console.log('Error', err); + throw err; + } +} +exports.fs = { + writeFile: s3_writeFile, + readFile: s3_readFile, + readdir: s3_readdir, + access: s3_exists, + rm: s3_deleteFile, +}; +exports.fs.promises = exports.fs; diff --git a/pkg/lang/javascript/aws_runtime/keyvalue.js.tmpl b/pkg/lang/javascript/aws_runtime/keyvalue.js.tmpl new file mode 100644 index 000000000..ce24d8ee7 --- /dev/null +++ b/pkg/lang/javascript/aws_runtime/keyvalue.js.tmpl @@ -0,0 +1,232 @@ +//@ts-nocheck +'use strict'; +Object.defineProperty(exports, "__esModule", { value: true }); +exports.dMap = void 0; +const _ = require("lodash"); +const moment = require("moment"); +const clients_1 = require("./clients"); +const lib_dynamodb_1 = require("@aws-sdk/lib-dynamodb"); +const { DynamoDBClient } = require('@aws-sdk/client-dynamodb'); +const docClient = new lib_dynamodb_1.DynamoDBDocumentClient(new DynamoDBClient(clients_1.AWSConfig)); +let alldMaps = []; +const electrodb_1 = require("electrodb"); +const KVStore = new electrodb_1.Entity({ + model: { + entity: 'entry', + version: '1', + service: 'store', + }, + attributes: { + map_id: { + type: 'string', + required: true, + }, + kv_key: { + type: 'string', + required: true, + }, + kv_value: { + type: 'any', + }, + expiration: { + type: 'number', + }, + }, + indexes: { + kv: { + pk: { + field: 'pk', + composite: ['map_id'], + }, + sk: { + field: 'sk', + composite: ['kv_key'], + }, + }, + }, + filters: {}, +}, { table: '{{.AppName}}', client: docClient }); +class dMap { + constructor(opts) { + this.dynamoCalls = 0; + this.nonCachedFunctionCalls = 0; + this.allFunctionCalls = 0; + this.deletedKeys = []; + this.opts = { + id: '', + batch_write: false, + write_on_change: true, + ...opts, + }; + this._cache = new Map(); + alldMaps.push(this); + } + emptyCache() { + this._cache.clear(); + } + async get(key) { + try { + this.allFunctionCalls += 1; + this.nonCachedFunctionCalls += 1; + let dbValue = (await KVStore.query.kv({ kv_key: key, map_id: this.opts.id }).go()).data; + let value = dbValue?.[0]?.kv_value; + if (value == '_DELETED') { + return undefined; + } + // value = this._restoreUndefinedValues(value); + this._cache.set(key, value); + return value; + } + catch (error) { + console.error('CloudCC Runtime error'); + console.error(error); + } + } + async has(key) { + return typeof (await this.get(key)) !== 'undefined'; + } + /** + * + * @param key + * @param value + * @param ttl time-to-live, seconds + */ + async set(key, value) { + if (key == 'options') + return this; // reserved keyword that functions as the constructor + //TODO: Need to calculate and manage Deltas to avoid continued growth of what we send + // to be batched + if (key != 'flush') + this._cache.set(key, value); + if (typeof value == 'object' && this.opts.versioned) { + const v = value; + let whereFunc; + const hadVersion = '__version' in v; + if (hadVersion) { + v.__version++; + whereFunc = ({ kv_value }, { eq }) => eq(kv_value.__version, v.__version - 1); + } + else { + v.__version = 0; + whereFunc = ({ kv_value }, { notExists }) => notExists(kv_value); + } + try { + await KVStore.put(this.toKVObject(key, v)).where(whereFunc).go(); + } + catch (err) { + if (err.message.includes('conditional request failed')) { + if (hadVersion) { + throw new Error(`Conditional put failed: expected version ${v.__version - 1} did not match`); + } + else { + throw new Error('Conditional put failed: expected item to not exist'); + } + } + else { + throw err; + } + } + return this; + } + if (this.opts.batch_write == false && this.opts.write_on_change == true) { + // Every time a change happens in the KV write to Dynamo immediately + await this.flushEntries([[key, value]]); + } + else if (key == 'flush' && + this.opts.batch_write == false && + this.opts.write_on_change == false) { + // On lambda exit, write final KV updates to Dynamo (no local intermediates) + await this.flushEntries(Array.from(this._cache.entries())); + } + return this; + } + async flushEntries(entriesToFlush) { + const cachedObjects = entriesToFlush.map(([key, value]) => this.toKVObject(key, value)); + try { + await KVStore.put(cachedObjects).go(); + } + catch (e) { + console.log(e); + } + } + expiration() { + if (this.opts.ttl) { + return moment().add(this.opts.ttl, 'seconds').unix(); + } + return undefined; + } + toKVObject(key, value) { + return { + map_id: this.opts.id, + kv_key: key, + kv_value: value, + expiration: this.expiration(), + }; + } + async delete(key) { + if (key == 'options') + return true; // reserved keyword that functions as the constructor + if (key == 'flush') { + await this._cache.delete(key); + return true; + } + this.deletedKeys.push(key); + // We don't actually delete keys - bad practice. We only allow clearing the entire set. We filter out the deleted ones later + await this.set(key, '_DELETED'); + return await this._cache.delete(key); + } + async keys() { + this.allFunctionCalls += 1; + try { + this.nonCachedFunctionCalls += 1; + this.dynamoCalls += 1; + const keyResults = (await KVStore.query.kv({ map_id: this.opts.id }).go()).data; + const filteredKeys = _.uniq([ + ...keyResults.filter((x) => x.kv_value != '_DELETED').map((x) => x.kv_key), + ...this._cache.keys(), + ]); + this.deletedKeys.forEach((key) => { + _.remove(filteredKeys, (x) => x == key); + }); + return filteredKeys.map((x) => x); + } + catch (e) { + console.error(e); + throw new Error(`CloudCompiler runtime error:`); + } + } + async entries() { + this.nonCachedFunctionCalls += 1; + this.dynamoCalls += 1; + try { + let keyResults = (await KVStore.query.kv({ map_id: this.opts.id }).go()).data; + this.deletedKeys.forEach((key) => { + _.remove(keyResults, (x) => x.kv_key == key); + }); + keyResults = keyResults.filter((x) => x.kv_value != '_DELETED'); //.map(x => [x.kv_key, this._restoreUndefinedValues(x.kv_value)]) + keyResults.map((kvPair) => { + if (this._cache.has(kvPair.kv_key)) + return; + this._cache.set(kvPair.kv_key, kvPair.kv_value); + }); + return this._cache.entries(); + } + catch (e) { + console.error(e); + throw new Error(`CloudCompiler runtime error:`); + } + } + async clear() { + try { + let entries = await KVStore.query.kv({ map_id: this.opts.id }).go(); + let result = await KVStore.delete(entries.data).go(); + this._cache.clear(); + return true; + } + catch (err) { + console.log(err); + return undefined; + } + } +} +exports.dMap = dMap; diff --git a/pkg/lang/javascript/aws_runtime/orm.js.tmpl b/pkg/lang/javascript/aws_runtime/orm.js.tmpl new file mode 100644 index 000000000..5a1f69b61 --- /dev/null +++ b/pkg/lang/javascript/aws_runtime/orm.js.tmpl @@ -0,0 +1,23 @@ +//@ts-nocheck +'use strict'; +Object.defineProperty(exports, "__esModule", { value: true }); +exports.getDataSourceParams = exports.getDBConn = void 0; +const ormPrefix = '{{.AppName}}'; +function getDBConn(dbNameEnvVar) { + const conn = process.env[dbNameEnvVar]; + return conn; +} +exports.getDBConn = getDBConn; +function getDataSourceParams(dbNameEnvVar, params) { + let newParams = { ...params }; + const fieldsToDelete = ['host', 'type', 'port', 'username', 'passowrd', 'database']; + for (const field of fieldsToDelete) { + delete newParams[field]; + } + return { + ...newParams, + type: 'postgres', + url: getDBConn(dbNameEnvVar), + }; +} +exports.getDataSourceParams = getDataSourceParams; diff --git a/pkg/lang/javascript/aws_runtime/proxy_eks.js.tmpl b/pkg/lang/javascript/aws_runtime/proxy_eks.js.tmpl new file mode 100644 index 000000000..8c94c3517 --- /dev/null +++ b/pkg/lang/javascript/aws_runtime/proxy_eks.js.tmpl @@ -0,0 +1,53 @@ +"use strict"; +Object.defineProperty(exports, "__esModule", { value: true }); +exports.proxyCall = void 0; +const axios = require('axios'); +const client_servicediscovery_1 = require("@aws-sdk/client-servicediscovery"); +const { APP_NAME } = process.env; +async function proxyCall(callType, execGroupName, moduleName, functionToCall, params) { + try { + const hostname = await getEksServiceIp(execGroupName); + const res = await axios({ + method: 'post', + url: `http://${hostname}:3001`, + data: { + callType, + execGroupName, + functionToCall, + moduleName, + params, + }, + }); + return res.data; + } + catch (error) { + console.log(error); + throw error; + } +} +exports.proxyCall = proxyCall; +async function getEksServiceIp(logicalName) { + try { + const client = new client_servicediscovery_1.ServiceDiscoveryClient({}); + const command = new client_servicediscovery_1.DiscoverInstancesCommand({ + NamespaceName: `default`, + ServiceName: logicalName, + }); + const response = await client.send(command); + const ips = response.Instances?.reduce((ips, instance) => { + const ip = instance.Attributes?.AWS_INSTANCE_IPV4; + if (ip) { + ips.push(ip); + } + return ips; + }, []); + if (ips == undefined || ips.length == 0) { + throw new Error(`No IPs found for ${logicalName}`); + } + return ips[0]; + } + catch (e) { + console.log(e); + throw e; + } +} diff --git a/pkg/lang/javascript/aws_runtime/proxy_fargate.js.tmpl b/pkg/lang/javascript/aws_runtime/proxy_fargate.js.tmpl new file mode 100644 index 000000000..a24329901 --- /dev/null +++ b/pkg/lang/javascript/aws_runtime/proxy_fargate.js.tmpl @@ -0,0 +1,53 @@ +"use strict"; +Object.defineProperty(exports, "__esModule", { value: true }); +exports.proxyCall = void 0; +const axios = require('axios'); +const client_servicediscovery_1 = require("@aws-sdk/client-servicediscovery"); +const { APP_NAME } = process.env; +async function proxyCall(callType, execGroupName, moduleName, functionToCall, params) { + try { + const hostname = await getExecFargateInstance(execGroupName); + const res = await axios({ + method: 'post', + url: `http://${hostname}:3001`, + data: { + callType, + execGroupName, + functionToCall, + moduleName, + params, + }, + }); + return res.data; + } + catch (error) { + console.log(error); + throw error; + } +} +exports.proxyCall = proxyCall; +async function getExecFargateInstance(logicalName) { + try { + const client = new client_servicediscovery_1.ServiceDiscoveryClient({}); + const command = new client_servicediscovery_1.DiscoverInstancesCommand({ + NamespaceName: `${APP_NAME}-privateDns`, + ServiceName: logicalName, + }); + const response = await client.send(command); + const ips = response.Instances?.reduce((ips, instance) => { + const ip = instance.Attributes?.AWS_INSTANCE_IPV4; + if (ip) { + ips.push(ip); + } + return ips; + }, []); + if (ips == undefined || ips.length == 0) { + throw new Error(`No IPs found for ${logicalName}`); + } + return ips[0]; + } + catch (e) { + console.log(e); + throw e; + } +} diff --git a/pkg/lang/javascript/aws_runtime/redis_cluster.js.tmpl b/pkg/lang/javascript/aws_runtime/redis_cluster.js.tmpl new file mode 100644 index 000000000..b6b28049b --- /dev/null +++ b/pkg/lang/javascript/aws_runtime/redis_cluster.js.tmpl @@ -0,0 +1,36 @@ +//@ts-nocheck +'use strict'; +Object.defineProperty(exports, "__esModule", { value: true }); +exports.getParams = void 0; +// The cluster client requires root nodes and defaults to be able to properly connect and get redirected to new slots in memorydb +function getParams(hostEnvVarName, portEnvVarName, params) { + const socketDefaults = {}; + if (params['defaults']?.socket) { + socketDefaults = params['defaults'].socket; + } + let newParams = { + ...params, + rootNodes: [ + { + socket: { + host: `${process.env[hostEnvVarName]}`, + port: `${process.env[portEnvVarName]}`, + tls: true, + }, + }, + ], + defaults: { + ...params['defaults'], + socket: { + ...socketDefaults, + host: `${process.env[hostEnvVarName]}`, + port: `${process.env[portEnvVarName]}`, + tls: true, + }, + }, + }; + return { + ...newParams, + }; +} +exports.getParams = getParams; diff --git a/pkg/lang/javascript/aws_runtime/redis_node.js.tmpl b/pkg/lang/javascript/aws_runtime/redis_node.js.tmpl new file mode 100644 index 000000000..bb23038a4 --- /dev/null +++ b/pkg/lang/javascript/aws_runtime/redis_node.js.tmpl @@ -0,0 +1,18 @@ +//@ts-nocheck +'use strict'; +Object.defineProperty(exports, "__esModule", { value: true }); +exports.getParams = void 0; +function getParams(hostEnvVarName, portEnvVarName, params) { + let newParams = { + ...params, + socket: { + ...params['socket'], + host: process.env[hostEnvVarName], + port: process.env[portEnvVarName], + }, + }; + return { + ...newParams, + }; +} +exports.getParams = getParams; diff --git a/pkg/lang/javascript/aws_runtime/secret.js.tmpl b/pkg/lang/javascript/aws_runtime/secret.js.tmpl new file mode 100644 index 000000000..823621b4e --- /dev/null +++ b/pkg/lang/javascript/aws_runtime/secret.js.tmpl @@ -0,0 +1,24 @@ +//@ts-nocheck +'use strict'; +Object.defineProperty(exports, "__esModule", { value: true }); +exports.readFile = void 0; +const client_secrets_manager_1 = require("@aws-sdk/client-secrets-manager"); +const client = new client_secrets_manager_1.SecretsManagerClient({}); +const secretPrefix = '{{.AppName}}'; +async function readFile(path) { + try { + const cmd = new client_secrets_manager_1.GetSecretValueCommand({ SecretId: `${secretPrefix}-${path}` }); + const data = await client.send(cmd); + if (data.SecretBinary) { + return Buffer.from(data.SecretBinary); + } + if (data.SecretString) { + return Buffer.from(data.SecretString, 'utf-8'); + } + throw new Error(`Empty secret for ${path}`); + } + catch (err) { + throw new Error(`Could not read secret '${path}'`); + } +} +exports.readFile = readFile; diff --git a/pkg/lang/python/aws_runtime/.gitignore b/pkg/lang/python/aws_runtime/.gitignore deleted file mode 100644 index 5c8b1b342..000000000 --- a/pkg/lang/python/aws_runtime/.gitignore +++ /dev/null @@ -1,4 +0,0 @@ -dispatcher_fargate.py.tmpl -dispatcher_lambda.py.tmpl -fs.py.tmpl -secret.py.tmpl diff --git a/pkg/lang/python/aws_runtime/compile_template.sh b/pkg/lang/python/aws_runtime/compile_template.sh index 9fd3d8c3e..be90bddca 100755 --- a/pkg/lang/python/aws_runtime/compile_template.sh +++ b/pkg/lang/python/aws_runtime/compile_template.sh @@ -10,8 +10,4 @@ do cp ${var}.py ${var}.py.tmpl ksed 's:#TMPL ::g' ${var}.py.tmpl echo "generated ${var}.py.tmpl" - echo ${var}.py.tmpl >> .gitignore done - -sort -u .gitignore > gitignore-tmp -mv gitignore-tmp .gitignore diff --git a/pkg/lang/python/aws_runtime/dispatcher_fargate.py.tmpl b/pkg/lang/python/aws_runtime/dispatcher_fargate.py.tmpl new file mode 100644 index 000000000..9828ed45f --- /dev/null +++ b/pkg/lang/python/aws_runtime/dispatcher_fargate.py.tmpl @@ -0,0 +1,111 @@ +import logging +import os +import multiprocessing +import types +import inspect + +app_port = os.getenv("KLOTHO_APP_PORT", 3000) +log_level = os.getenv("KLOTHO_LOG_LEVEL", "DEBUG").upper() +uvicorn_log_level = os.getenv("UVICORN_LOG_LEVEL", "info").lower() +host = "0.0.0.0" + +logging.basicConfig() +log = logging.getLogger("klotho") +log.setLevel(logging.DEBUG) + + +def userland_main(): + main_module_name = "{{.Expose.AppModule}}" + if not main_module_name: + log.info("No main defined. Will only listen as a proxy server for RPC calls.") + return + entrypoint = try_import(main_module_name) + + if entrypoint is None: + log.error("Startup failed: No entrypoint found!") + exit(1) + + uvicorn = try_import("uvicorn") + fastapi = try_import("fastapi") + + run_fastapi_app = None + if uvicorn is not None and fastapi is not None: + run_fastapi_app = run_fastapi_app_func(uvicorn=uvicorn, fastapi=fastapi, entrypoint=entrypoint) + + if run_fastapi_app is not None: + log.debug("Starting FastAPI app...") + run_fastapi_app() + else: + try: + entrypoint.__klotho_main__() + except AttributeError as err: + log.error(err) + + +def start_proxy_server(): + import fastapi + import uvicorn + klotho_proxy_app = fastapi.FastAPI() + + @klotho_proxy_app.get("/") + async def proxy_root_get(): + return + + @klotho_proxy_app.post("/") + async def proxy_root_post(obj: dict): + module_name, function_name, params = obj['module_name'], obj['function_to_call'], obj['params'] + module_obj = try_import(module_name) + if not module_obj: + raise Exception(f"couldn't find module: {module_name}") + function = getattr(module_obj, function_name, None) + if not function: + raise Exception(f"couldn't find function: {module_name}.{function_name}") + param_args = () + param_kwargs = {} + args_spec = inspect.getfullargspec(function) + if args_spec.varkw: + param_kwargs, params = params[-1], params[:-1] + if args_spec.varargs: + param_args, params = params[-1], params[:-1] + result = function(*params, *param_args, **param_kwargs) + if isinstance(result, types.CoroutineType): + result = await result + return result + + uvicorn.run( + klotho_proxy_app, + host=host, + port=3001, + log_level=uvicorn_log_level) + + +def try_import(module_name): + from importlib import import_module + try: + return import_module(module_name) + except ModuleNotFoundError as e: + log.debug(f"{module_name} could not be imported: {e}") + + +def run_fastapi_app_func(uvicorn, fastapi, entrypoint): + fastapi_type = fastapi.FastAPI + api = getattr(entrypoint, "{{.Expose.ExportedAppVar}}") + if type(api) is fastapi_type: + def func(): + uvicorn.run( + f"{entrypoint.__name__}:{{.Expose.ExportedAppVar}}", + host=host, + port=app_port, + log_level=uvicorn_log_level) + + return func + else: + log.debug("No FastAPI apps detected.") + + +if __name__ == "__main__": + subprocesses = [multiprocessing.Process(target=m) for m in [userland_main, start_proxy_server]] + for sp in subprocesses: + sp.start() + for sp in subprocesses: + sp.join() diff --git a/pkg/lang/python/aws_runtime/dispatcher_lambda.py.tmpl b/pkg/lang/python/aws_runtime/dispatcher_lambda.py.tmpl new file mode 100644 index 000000000..ff05715a2 --- /dev/null +++ b/pkg/lang/python/aws_runtime/dispatcher_lambda.py.tmpl @@ -0,0 +1,96 @@ +from . import fs as s3fs +import asyncio +import json +import logging +import os +import types +import uuid +import inspect + +log_level = os.getenv("KLOTHO_LOG_LEVEL", "DEBUG").upper() + +logging.basicConfig() +log = logging.getLogger("klotho") +log.setLevel(logging.DEBUG) + +asgi_handler = None + + +def handler(event, context): + request_handler = get_handler(event) + if not request_handler: + raise Exception("this request could not be handled: no handler found") + + result = request_handler(event, context) + if isinstance(result, types.CoroutineType): + result = asyncio.run(result) + return result + + +def init_asgi_handler(): + global asgi_handler + + entrypoint = try_import("{{.Expose.AppModule}}") + + if entrypoint is None: + raise Exception("startup failed: no entrypoint found") + + fastapi = try_import("fastapi") + mangum = try_import("mangum") + if not fastapi or not mangum: + return + + app = get_fastapi_app(fastapi, entrypoint) + asgi_handler = mangum.Mangum(app) + return asgi_handler + + +async def rpc_handler(event, _context): + payload_key = event.get('params') + async with s3fs.open(payload_key) as f: + params = json.loads(await f.read()) + module_obj = try_import(event.get('module_name')) + if not module_obj: + raise Exception("couldn't find module for path: {module_path}") + function = getattr(module_obj, event.get('function_to_call')) + param_args = () + param_kwargs = {} + args_spec = inspect.getfullargspec(function) + if args_spec.varkw: + param_kwargs, params = params[-1], params[:-1] + if args_spec.varargs: + param_args, params = params[-1], params[:-1] + result = function(*params, *param_args, **param_kwargs) + if isinstance(result, types.CoroutineType): + result = await result + + result_payload_key = str(uuid.uuid4()) + async with s3fs.open(result_payload_key, mode='w') as f: + await f.write(json.dumps(result)) + return result_payload_key + + +def get_handler(event): + if "httpMethod" in event: + return asgi_handler if asgi_handler else init_asgi_handler() + elif "module_name" in event: + return rpc_handler + else: + raise Exception(f'unsupported invocation. event keys: {list(event.keys())}') + + +def try_import(module_name): + from importlib import import_module + try: + return import_module(module_name) + except ModuleNotFoundError as e: + log.warning(f"{module_name} could not be imported: {e}") + + +def get_fastapi_app(fastapi, entrypoint): + fastapi_type = fastapi.FastAPI + app = getattr(entrypoint, "{{.Expose.ExportedAppVar}}") + if type(app) is fastapi_type: + return app + else: + log.warning("No FastAPI apps detected.") diff --git a/pkg/lang/python/aws_runtime/fs.py.tmpl b/pkg/lang/python/aws_runtime/fs.py.tmpl new file mode 100644 index 000000000..85bf1a87e --- /dev/null +++ b/pkg/lang/python/aws_runtime/fs.py.tmpl @@ -0,0 +1,53 @@ +import os + +import boto3 + +payloadBucketPhysicalName = os.getenv("KLOTHO_S3_PREFIX") + "{{.PayloadsBucketName}}" + + +def open(url: str, **kwargs): + return s3ContextManager(str(url), **kwargs) + + +class s3ContextManager(object): + def __init__(self, file_name: str, **kwargs): + self.client = boto3.client('s3') + self.bucket_name = payloadBucketPhysicalName + self.file_name = file_name + self.kwargs = kwargs + + async def __aenter__(self): + return FsItem(bucket_name=self.bucket_name, file_name=self.file_name, client=self.client, **self.kwargs) + + async def __aexit__(self, exc_type, exc_val, traceback): + pass + + +class FsItem(object): + def __init__(self, file_name: str, bucket_name: str, client: boto3.client, **kwargs): + self.client = client + self.bucket_name = bucket_name + self.file_name = file_name + mode = kwargs.get("mode", "r") + self.is_readable = "r" in mode + self.is_writeable = "w" in mode or "+" in mode or "x" in mode + self.is_binary = "b" in mode + + if "a" in mode: + raise IOError('@klotho::persist does not support append mode') + + self.encoding = kwargs.get("encoding", "utf-8") + + async def write(self, content: str): + if not self.is_writeable: + raise IOError(f"{self.file_name} is not writeable") + self.client.put_object(Key=self.file_name, Bucket=self.bucket_name, Body=content) + + async def read(self): + if not self.is_readable: + raise IOError(f"{self.file_name} is not readable") + response = self.client.get_object(Key=self.file_name, Bucket=self.bucket_name) + body = response["Body"].read() + if not self.is_binary: + body = body.decode(self.encoding) + return body diff --git a/pkg/lang/python/aws_runtime/secret.py.tmpl b/pkg/lang/python/aws_runtime/secret.py.tmpl new file mode 100644 index 000000000..3d262dbf6 --- /dev/null +++ b/pkg/lang/python/aws_runtime/secret.py.tmpl @@ -0,0 +1,30 @@ +import boto3 + +secretPrefix = '{{.AppName}}' + +def open(path: str, **kwargs): + return secretsContexManager(path) + +class secretsContexManager(): + def __init__(self, path: str): + self.client = boto3.client('secretsmanager') + self.secret_name = "{}-{}".format(secretPrefix, path) + + async def __aenter__(self): + return SecretItem(name=self.secret_name, client=self.client) + + async def __aexit__(self, exc_type, exc_val, traceback): + pass + +class SecretItem(): + def __init__(self, name: str, client: boto3.client, **kwargs): + self.client = client + self.name = name + + async def read(self): + response = self.client.get_secret_value(SecretId=self.name) + if response["SecretBinary"]: + return response["SecretBinary"].decode('utf8') + elif response["SecretString"]: + return response["SecretString"] + raise Exception("Empty Secret") \ No newline at end of file