Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Set up env config #17

Merged
merged 13 commits into from
Nov 25, 2024
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,9 @@ pnpm start

```sh
export N8N_LAUNCHER_LOG_LEVEL=debug
export N8N_RUNNERS_N8N_URI=... # e.g. http://127.0.0.1:5679
export N8N_MAIN_URI=... # e.g. http://127.0.0.1:5678
export N8N_TASK_BROKER_URI=... # e.g. http://127.0.0.1:5679
export N8N_RUNNER_URI=... # e.g. http://127.0.0.1:5680
export N8N_RUNNERS_AUTH_TOKEN=... # i.e. same string as in step 4

make run
Expand Down
5 changes: 0 additions & 5 deletions internal/auth/handshake.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"encoding/hex"
"fmt"
"net/url"
"strings"
"task-runner-launcher/internal/logs"

"github.com/gorilla/websocket"
Expand Down Expand Up @@ -53,10 +52,6 @@ func validateConfig(cfg HandshakeConfig) error {
}

func buildWebsocketURL(n8nURI, runnerID string) (*url.URL, error) {
if !strings.HasPrefix(n8nURI, "http://") && !strings.HasPrefix(n8nURI, "https://") {
n8nURI = "http://" + n8nURI
}

u, err := url.Parse(n8nURI)
if err != nil {
return nil, fmt.Errorf("invalid n8n URI: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion internal/auth/token_exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type grantTokenResponse struct {
}

func sendGrantTokenRequest(n8nURI, authToken string) (string, error) {
url := fmt.Sprintf("http://%s/runners/auth", n8nURI)
url := fmt.Sprintf("%s/runners/auth", n8nURI)

payload := map[string]string{"token": authToken}
payloadBytes, err := json.Marshal(payload)
Expand Down
51 changes: 16 additions & 35 deletions internal/commands/launch.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
"os"
"os/exec"
"strconv"
"task-runner-launcher/internal/auth"
"task-runner-launcher/internal/config"
"task-runner-launcher/internal/env"
Expand All @@ -16,45 +15,27 @@ type LaunchCommand struct {
RunnerType string
}

const idleTimeoutEnvVar = "N8N_RUNNERS_AUTO_SHUTDOWN_TIMEOUT"
const defaultIdleTimeoutValue = "15" // seconds

func (l *LaunchCommand) Execute() error {
logs.Info("Starting to execute `launch` command")

authToken := os.Getenv("N8N_RUNNERS_AUTH_TOKEN")
n8nRunnerServerURI := os.Getenv("N8N_RUNNERS_N8N_URI")
n8nMainServerURI := os.Getenv("N8N_MAIN_URI")
idleTimeout := os.Getenv(idleTimeoutEnvVar)

if authToken == "" || n8nRunnerServerURI == "" {
return fmt.Errorf("both N8N_RUNNERS_AUTH_TOKEN and N8N_RUNNERS_N8N_URI are required")
}

if n8nMainServerURI == "" {
return fmt.Errorf("N8N_MAIN_URI is required")
}
// 0. validate env vars

if idleTimeout == "" {
os.Setenv(idleTimeoutEnvVar, defaultIdleTimeoutValue)
} else {
idleTimeoutInt, err := strconv.Atoi(idleTimeout)
if err != nil || idleTimeoutInt < 0 {
return fmt.Errorf("%s must be a non-negative integer", idleTimeoutEnvVar)
}
envCfg, err := env.FromEnv()
if err != nil {
return fmt.Errorf("env vars failed validation: %w", err)
}

// 1. read configuration

cfg, err := config.ReadConfig()
fileCfg, err := config.ReadConfig()
if err != nil {
logs.Errorf("Error reading config: %v", err)
logs.Errorf("Error reading config file: %v", err)
return err
}

var runnerConfig config.TaskRunnerConfig
found := false
for _, r := range cfg.TaskRunners {
for _, r := range fileCfg.TaskRunners {
if r.RunnerType == l.RunnerType {
runnerConfig = r
found = true
Expand All @@ -66,12 +47,12 @@ func (l *LaunchCommand) Execute() error {
return fmt.Errorf("config file does not contain requested runner type: %s", l.RunnerType)
}

cfgNum := len(cfg.TaskRunners)
taskRunnersNum := len(fileCfg.TaskRunners)

if cfgNum == 1 {
if taskRunnersNum == 1 {
logs.Debug("Loaded config file loaded with a single runner config")
} else {
logs.Debugf("Loaded config file with %d runner configs", cfgNum)
logs.Debugf("Loaded config file with %d runner configs", taskRunnersNum)
}

// 2. change into working directory
Expand All @@ -84,22 +65,22 @@ func (l *LaunchCommand) Execute() error {

// 3. filter environment variables

defaultEnvs := []string{"LANG", "PATH", "TZ", "TERM", idleTimeoutEnvVar}
defaultEnvs := []string{"LANG", "PATH", "TZ", "TERM", env.EnvVarIdleTimeout, env.EnvVarRunnerServerEnabled}
allowedEnvs := append(defaultEnvs, runnerConfig.AllowedEnv...)
runnerEnv := env.AllowedOnly(allowedEnvs)

logs.Debugf("Filtered environment variables")

// 4. wait for n8n instance to be ready

if err := http.WaitForN8nReady(n8nMainServerURI); err != nil {
if err := http.WaitForN8nReady(envCfg.MainServerURI); err != nil {
return fmt.Errorf("encountered error while waiting for n8n to be ready: %w", err)
}

for {
// 5. fetch grant token for launcher

launcherGrantToken, err := auth.FetchGrantToken(n8nRunnerServerURI, authToken)
launcherGrantToken, err := auth.FetchGrantToken(envCfg.TaskBrokerServerURI, envCfg.AuthToken)
if err != nil {
return fmt.Errorf("failed to fetch grant token for launcher: %w", err)
}
Expand All @@ -110,7 +91,7 @@ func (l *LaunchCommand) Execute() error {

handshakeCfg := auth.HandshakeConfig{
TaskType: l.RunnerType,
N8nURI: n8nRunnerServerURI,
N8nURI: envCfg.TaskBrokerServerURI,
GrantToken: launcherGrantToken,
}

Expand All @@ -120,7 +101,7 @@ func (l *LaunchCommand) Execute() error {

// 7. fetch grant token for runner

runnerGrantToken, err := auth.FetchGrantToken(n8nRunnerServerURI, authToken)
runnerGrantToken, err := auth.FetchGrantToken(envCfg.TaskBrokerServerURI, envCfg.AuthToken)
if err != nil {
return fmt.Errorf("failed to fetch grant token for runner: %w", err)
}
Expand Down Expand Up @@ -149,6 +130,6 @@ func (l *LaunchCommand) Execute() error {
}

// next runner will need to fetch a new grant token
runnerEnv = env.Clear(runnerEnv, "N8N_RUNNERS_GRANT_TOKEN")
runnerEnv = env.Clear(runnerEnv, env.EnvVarGrantToken)
}
}
116 changes: 116 additions & 0 deletions internal/env/env.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,62 @@
package env

import (
"errors"
"fmt"
"net/url"
"os"
"strconv"
"strings"
)

const (
// ------------------------
// auth
// ------------------------
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could use

// #region auth
...
// #endregion auth

to make these foldable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apparently not supported for Go? https://share.cleanshot.com/5lyhDHfR

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I thought I tested it and it worked but seems it only folded the start of the comment :/ Yeah doesn't seem to be supported


// EnvVarAuthToken is the env var for the auth token sent by the launcher to
// the main instance in exchange for a single-use grant token.
// nolint:gosec // G101: False positive
EnvVarAuthToken = "N8N_RUNNERS_AUTH_TOKEN"

// EnvVarGrantToken is the env var for the single-use grant token returned by
// the main instance to the launcher in exchange for the auth token.
// nolint:gosec // G101: False positive
EnvVarGrantToken = "N8N_RUNNERS_GRANT_TOKEN"

// ------------------------
// n8n main
// ------------------------

// EnvVarMainServerURI is the env var for the URI of the n8n main instance's
// main server, typically at http://127.0.0.1:5678.
EnvVarMainServerURI = "N8N_MAIN_URI"

// EnVarTaskBrokerServerURI is the env var for the URI of the n8n main
// instance's task broker server, typically at http://127.0.0.1:5679.
EnVarTaskBrokerServerURI = "N8N_TASK_BROKER_URI"

// ------------------------
// runner
// ------------------------

// EnvVarRunnerServerURI is the env var for the URI of the runner's server.
// Used for monitoring the runner's health, typically at http://127.0.0.1:5680.
EnvVarRunnerServerURI = "N8N_RUNNER_URI"

// EnvVarRunnerServerEnabled is the env var for whether the runner's health
// check server should be started.
EnvVarRunnerServerEnabled = "N8N_RUNNERS_SERVER_ENABLED"
ivov marked this conversation as resolved.
Show resolved Hide resolved

// EnvVarIdleTimeout is the env var for how long (in seconds) a runner may be
// idle for before exit.
ivov marked this conversation as resolved.
Show resolved Hide resolved
EnvVarIdleTimeout = "N8N_RUNNERS_AUTO_SHUTDOWN_TIMEOUT"
)

const (
defaultIdleTimeoutValue = "15" // seconds
)

// AllowedOnly filters the current environment down to only those
// environment variables in the allow list.
func AllowedOnly(allowed []string) []string {
Expand Down Expand Up @@ -50,3 +102,67 @@ func Clear(envVars []string, envVarName string) []string {

return result
}

// Config holds validated environment variable values.
type Config struct {
AuthToken string
MainServerURI string
TaskBrokerServerURI string
RunnerServerURI string
}

// FromEnv retrieves vars from the environment, validates their values, and
// returns a Config holding the validated values, or a slice of errors.
func FromEnv() (*Config, error) {
var errs []error

authToken := os.Getenv(EnvVarAuthToken)
mainServerURI := os.Getenv(EnvVarMainServerURI)
taskBrokerServerURI := os.Getenv(EnVarTaskBrokerServerURI)
runnerServerURI := os.Getenv(EnvVarRunnerServerURI)
idleTimeout := os.Getenv(EnvVarIdleTimeout)

if authToken == "" {
errs = append(errs, fmt.Errorf("%s is required", EnvVarAuthToken))
}

if mainServerURI == "" {
errs = append(errs, fmt.Errorf("%s is required", EnvVarMainServerURI))
} else if _, err := url.Parse(mainServerURI); err != nil {
errs = append(errs, fmt.Errorf("%s must be a valid URL: %w", EnvVarMainServerURI, err))
}

if runnerServerURI == "" {
errs = append(errs, fmt.Errorf("%s is required", EnvVarRunnerServerURI))
} else if _, err := url.Parse(runnerServerURI); err != nil {
errs = append(errs, fmt.Errorf("%s must be a valid URL: %w", EnvVarRunnerServerURI, err))
}

if taskBrokerServerURI == "" {
errs = append(errs, fmt.Errorf("%s is required", EnVarTaskBrokerServerURI))
} else if _, err := url.Parse(taskBrokerServerURI); err != nil {
errs = append(errs, fmt.Errorf("%s must be a valid URL: %w", EnVarTaskBrokerServerURI, err))
}

if idleTimeout == "" {
os.Setenv(EnvVarIdleTimeout, defaultIdleTimeoutValue)
ivov marked this conversation as resolved.
Show resolved Hide resolved
} else {
idleTimeoutInt, err := strconv.Atoi(idleTimeout)
if err != nil || idleTimeoutInt < 0 {
errs = append(errs, fmt.Errorf("%s must be a non-negative integer", EnvVarIdleTimeout))
}
}

if len(errs) > 0 {
return nil, errors.Join(errs...)
}

os.Setenv(EnvVarRunnerServerEnabled, "true")

return &Config{
AuthToken: authToken,
MainServerURI: mainServerURI,
TaskBrokerServerURI: taskBrokerServerURI,
RunnerServerURI: runnerServerURI,
}, nil
}
8 changes: 1 addition & 7 deletions internal/http/wait_for_n8n.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,13 @@ package http
import (
"fmt"
"net/http"
"strings"
"task-runner-launcher/internal/logs"
"task-runner-launcher/internal/retry"
"time"
)

func sendReadinessRequest(n8nMainServerURI string) (*http.Response, error) {
baseURL := n8nMainServerURI
if !strings.HasPrefix(n8nMainServerURI, "http://") && !strings.HasPrefix(n8nMainServerURI, "https://") {
baseURL = "http://" + n8nMainServerURI
}

url := fmt.Sprintf("%s/healthz/readiness", baseURL)
url := fmt.Sprintf("%s/healthz/readiness", n8nMainServerURI)

client := &http.Client{
Timeout: 5 * time.Second,
Expand Down