Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add endpoint circuit breaker #2120

Merged
merged 52 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from 44 commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
901c27f
feat: setup circuit breaker
jirevwe Aug 12, 2024
fc355f9
feat: add circuit breaker switching implementation
jirevwe Aug 16, 2024
6f691e8
feat: add circuit breaker configuration
jirevwe Aug 19, 2024
a4fc664
feat: add circuit breaker configuration
jirevwe Aug 19, 2024
528ef30
feat: remove db depndency and add function to fetch breaker state
jirevwe Aug 21, 2024
feb2b22
feat: rename sample_time to sample rate; set reasonable defaults when…
jirevwe Aug 21, 2024
e96034e
feat: add circuit breaker to instance config
jirevwe Aug 21, 2024
be3208e
feat: move half-open decision after other checks have been made, so w…
jirevwe Aug 22, 2024
1081b3c
feat: add index to migration
jirevwe Aug 22, 2024
9dba343
feat: personal review first pass; add func to validate custom config;…
jirevwe Aug 22, 2024
848559e
feat: personal review second pass; added CircuitBreakerManager.CanExe…
jirevwe Aug 23, 2024
2880b8c
feat: personal review third pass; use strings builder to build config…
jirevwe Aug 23, 2024
e268fa4
feat: use deadline context for redis operations to prevent deadlock; …
jirevwe Aug 28, 2024
d9a7dd3
feat: split package into separate files, add tests for each component
jirevwe Aug 30, 2024
7080d15
chore: update config tests
jirevwe Aug 30, 2024
9c7c646
chore: update tests
jirevwe Sep 2, 2024
1efcafe
chore: implement distributed lock using redlock
jirevwe Sep 2, 2024
afd3878
merge with main
jirevwe Sep 5, 2024
3c01da3
Merge branch 'main' into raymond/feat/add-circuit-breaker
jirevwe Sep 5, 2024
842080c
chore: fix lint error
jirevwe Sep 5, 2024
abe8ada
chore: fix lint errors
jirevwe Sep 5, 2024
97f427c
chore: fix test
jirevwe Sep 6, 2024
71bb297
patch: remove concurrent write path
jirevwe Sep 16, 2024
ea55312
feat: put the circuit breaker behind the license;
jirevwe Sep 17, 2024
85f4702
feat: Change FailureThreshold to an int; change ErrorTimeout to Break…
jirevwe Sep 17, 2024
9812d53
feat: put circuit breaker behind a feature flag
jirevwe Sep 17, 2024
370385b
Merge branch 'main' into raymond/feat/add-circuit-breaker
jirevwe Sep 17, 2024
5eae27e
chore: update tests
jirevwe Sep 18, 2024
ed42554
chore: fix go lint
jirevwe Sep 18, 2024
6ba935c
feat: refactor the sample query to discard entries that occurred befo…
jirevwe Sep 20, 2024
2921089
feat: make the success threshold a percentage; add success rate to th…
jirevwe Sep 24, 2024
96ed4b8
fix: fix lint error
jirevwe Sep 24, 2024
5b2c43b
chore: fix posthog js version
jirevwe Sep 25, 2024
02563e8
Merge branch 'main' into raymond/feat/add-circuit-breaker
jirevwe Sep 27, 2024
ba678bd
chore: PR review changes
jirevwe Sep 27, 2024
e0ecd07
feat: add circuit breaker metrics
jirevwe Sep 27, 2024
9c7490b
feat: add tenant id
jirevwe Sep 30, 2024
f539317
chore: add interval variable back
jirevwe Sep 30, 2024
11de670
feat: Handle disabling endpoint correctly; Add debug logs for circuit…
jirevwe Oct 1, 2024
cace946
chore: fix tests
jirevwe Oct 1, 2024
9a4b940
chore: remove break
jirevwe Oct 1, 2024
6059d92
feat: add failure rates to endpoint api response
jirevwe Oct 2, 2024
a6a5ba7
ui cleanups
Oluwadaminiola Oct 2, 2024
e8f1a9a
feat: add activate endpoint
jirevwe Oct 2, 2024
8e35724
feat: update notification templates
jirevwe Oct 3, 2024
c1e6e99
feat: remove NotificationThresholds
jirevwe Oct 3, 2024
276743a
feat: fix edge case; use logger with log level
jirevwe Oct 4, 2024
d4347bf
merge with main
jirevwe Oct 9, 2024
9ab0419
deduplicate how circuit breakers are loaded from the store
jirevwe Oct 9, 2024
4ef0810
feat: fix distributed locking mechanism; add sample latency metric
jirevwe Oct 9, 2024
6052ee8
feat: lint fixes
jirevwe Oct 9, 2024
1ebbaee
feat: add time at the end of the sample window
jirevwe Oct 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ func (a *ApplicationHandler) BuildControlPlaneRoutes() *chi.Mux {
e.With(handler.RequireEnabledProject()).Delete("/", handler.DeleteEndpoint)
e.With(handler.RequireEnabledProject()).Put("/expire_secret", handler.ExpireSecret)
e.With(handler.RequireEnabledProject()).Put("/pause", handler.PauseEndpoint)
e.With(handler.RequireEnabledProject()).Post("/activate", handler.ActivateEndpoint)
})
})

Expand Down
26 changes: 13 additions & 13 deletions api/handlers/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,24 @@ import (
)

func (h *Handler) GetConfiguration(w http.ResponseWriter, r *http.Request) {
config, err := postgres.NewConfigRepo(h.A.DB).LoadConfiguration(r.Context())
configuration, err := postgres.NewConfigRepo(h.A.DB).LoadConfiguration(r.Context())
if err != nil && !errors.Is(err, datastore.ErrConfigNotFound) {
_ = render.Render(w, r, util.NewServiceErrResponse(err))
return
}

configResponse := []*models.ConfigurationResponse{}
if config != nil {
if config.StoragePolicy.Type == datastore.S3 {
var configResponse []*models.ConfigurationResponse
if configuration != nil {
if configuration.StoragePolicy.Type == datastore.S3 {
policy := &datastore.S3Storage{}
policy.Bucket = config.StoragePolicy.S3.Bucket
policy.Endpoint = config.StoragePolicy.S3.Endpoint
policy.Region = config.StoragePolicy.S3.Region
config.StoragePolicy.S3 = policy
policy.Bucket = configuration.StoragePolicy.S3.Bucket
policy.Endpoint = configuration.StoragePolicy.S3.Endpoint
policy.Region = configuration.StoragePolicy.S3.Region
configuration.StoragePolicy.S3 = policy
}

c := &models.ConfigurationResponse{
Configuration: config,
Configuration: configuration,
ApiVersion: convoy.GetVersion(),
}

Expand All @@ -61,14 +61,14 @@ func (h *Handler) CreateConfiguration(w http.ResponseWriter, r *http.Request) {
NewConfig: &newConfig,
}

config, err := cc.Run(r.Context())
configuration, err := cc.Run(r.Context())
if err != nil {
_ = render.Render(w, r, util.NewServiceErrResponse(err))
return
}

c := &models.ConfigurationResponse{
Configuration: config,
Configuration: configuration,
ApiVersion: convoy.GetVersion(),
}

Expand All @@ -92,14 +92,14 @@ func (h *Handler) UpdateConfiguration(w http.ResponseWriter, r *http.Request) {
Config: &newConfig,
}

config, err := uc.Run(r.Context())
configuration, err := uc.Run(r.Context())
if err != nil {
_ = render.Render(w, r, util.NewServiceErrResponse(err))
return
}

c := &models.ConfigurationResponse{
Configuration: config,
Configuration: configuration,
ApiVersion: convoy.GetVersion(),
}

Expand Down
84 changes: 83 additions & 1 deletion api/handlers/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package handlers
import (
"context"
"encoding/json"
"fmt"
"github.com/frain-dev/convoy/pkg/circuit_breaker"
"github.com/frain-dev/convoy/pkg/msgpack"
"net/http"

"github.com/frain-dev/convoy/api/models"
Expand Down Expand Up @@ -207,9 +210,37 @@ func (h *Handler) GetEndpoints(w http.ResponseWriter, r *http.Request) {
return
}

// fetch keys from redis and mutate endpoints slice
keys := make([]string, len(endpoints))
for i := 0; i < len(endpoints); i++ {
keys[i] = fmt.Sprintf("breaker:%s", endpoints[i].UID)
}

cbs, err := h.A.Redis.MGet(r.Context(), keys...).Result()
if err != nil {
_ = render.Render(w, r, util.NewServiceErrResponse(err))
return
}

for i := 0; i < len(cbs); i++ {
if cbs[i] != nil {
str, ok := cbs[i].(string)
if ok {
var c circuit_breaker.CircuitBreaker
asBytes := []byte(str)
innerErr := msgpack.DecodeMsgPack(asBytes, &c)
if innerErr != nil {
continue
}
endpoints[i].FailureRate = c.FailureRate
}
}
}

resp := models.NewListResponse(endpoints, func(endpoint datastore.Endpoint) models.EndpointResponse {
return models.EndpointResponse{Endpoint: &endpoint}
})

serverResponse := util.NewServerResponse(
"Endpoints fetched successfully",
models.PagedResponse{Content: &resp, Pagination: &paginationData}, http.StatusOK)
Expand Down Expand Up @@ -410,7 +441,7 @@ func (h *Handler) ExpireSecret(w http.ResponseWriter, r *http.Request) {
// PauseEndpoint
//
// @Summary Pause endpoint
// @Description This endpoint toggles an endpoint status between the active and paused states
// @Description Toggles an endpoint's status between active and paused states
// @Id PauseEndpoint
// @Tags Endpoints
// @Accept json
Expand Down Expand Up @@ -458,6 +489,57 @@ func (h *Handler) PauseEndpoint(w http.ResponseWriter, r *http.Request) {
util.WriteResponse(w, r, resBytes, http.StatusAccepted)
}

// ActivateEndpoint
//
// @Summary Activate endpoint
// @Description Activated an inactive endpoint
// @Id PauseEndpoint
// @Tags Endpoints
// @Accept json
// @Produce json
// @Param projectID path string true "Project ID"
// @Param endpointID path string true "Endpoint ID"
// @Success 202 {object} util.ServerResponse{data=models.EndpointResponse}
// @Failure 400,401,404 {object} util.ServerResponse{data=Stub}
// @Security ApiKeyAuth
// @Router /v1/projects/{projectID}/endpoints/{endpointID}/activate [post]
func (h *Handler) ActivateEndpoint(w http.ResponseWriter, r *http.Request) {
project, err := h.retrieveProject(r)
if err != nil {
_ = render.Render(w, r, util.NewErrorResponse(err.Error(), http.StatusBadRequest))
return
}

aes := services.ActivateEndpointService{
EndpointRepo: postgres.NewEndpointRepo(h.A.DB, h.A.Cache),
ProjectID: project.UID,
EndpointId: chi.URLParam(r, "endpointID"),
}

endpoint, err := aes.Run(r.Context())
if err != nil {
_ = render.Render(w, r, util.NewServiceErrResponse(err))
return
}

resp := &models.EndpointResponse{Endpoint: endpoint}
serverResponse := util.NewServerResponse("endpoint status successfully activated", resp, http.StatusAccepted)

rb, err := json.Marshal(serverResponse)
if err != nil {
_ = render.Render(w, r, util.NewServiceErrResponse(err))
return
}

resBytes, err := h.RM.VersionResponse(r, rb, "UpdateEndpoint")
if err != nil {
_ = render.Render(w, r, util.NewServiceErrResponse(err))
return
}

util.WriteResponse(w, r, resBytes, http.StatusAccepted)
}

func (h *Handler) retrieveEndpoint(ctx context.Context, endpointID, projectID string) (*datastore.Endpoint, error) {
endpointRepo := postgres.NewEndpointRepo(h.A.DB, h.A.Cache)
return endpointRepo.FindEndpointByID(ctx, endpointID, projectID)
Expand Down
2 changes: 1 addition & 1 deletion api/models/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ type PortalLinkResponse struct {
DeletedAt null.Time `json:"deleted_at,omitempty"`
}

// NewListResponse is generic function for looping over
// NewListResponse is a generic function for looping over
// a slice of type M and returning a slice of type T
func NewListResponse[T, M any](items []M, fn func(item M) T) []T {
results := make([]T, 0)
Expand Down
3 changes: 3 additions & 0 deletions api/server_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,13 @@ func buildServer() *ApplicationHandler {
noopCache := ncache.NewNoopCache()
r, _ := rlimiter.NewRedisLimiter(cfg.Redis.BuildDsn())

rd, _ := rdb.NewClient(cfg.Redis.BuildDsn())

ah, _ := NewApplicationHandler(
&types.APIOptions{
DB: db,
Queue: newQueue,
Redis: rd.Client(),
Logger: logger,
Cache: noopCache,
Rate: r,
Expand Down
16 changes: 9 additions & 7 deletions api/testdb/seed.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,21 +557,23 @@ func SeedUser(db database.Database, email, password string) (*datastore.User, er
}

func SeedConfiguration(db database.Database) (*datastore.Configuration, error) {
config := &datastore.Configuration{
UID: ulid.Make().String(),
IsAnalyticsEnabled: true,
IsSignupEnabled: true,
StoragePolicy: &datastore.DefaultStoragePolicy,
c := &datastore.Configuration{
UID: ulid.Make().String(),
IsAnalyticsEnabled: true,
IsSignupEnabled: true,
StoragePolicy: &datastore.DefaultStoragePolicy,
RetentionPolicy: &datastore.DefaultRetentionPolicy,
CircuitBreakerConfig: &datastore.DefaultCircuitBreakerConfiguration,
}

// Seed Data
configRepo := postgres.NewConfigRepo(db)
err := configRepo.CreateConfiguration(context.TODO(), config)
err := configRepo.CreateConfiguration(context.TODO(), c)
if err != nil {
return nil, err
}

return config, nil
return c, nil
}

func SeedDevice(db database.Database, g *datastore.Project, endpointID string) error {
Expand Down
2 changes: 2 additions & 0 deletions api/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ import (
"github.com/frain-dev/convoy/internal/pkg/limiter"
"github.com/frain-dev/convoy/pkg/log"
"github.com/frain-dev/convoy/queue"
"github.com/redis/go-redis/v9"
)

type ContextKey string

type APIOptions struct {
FFlag *fflag.FFlag
DB database.Database
Redis redis.UniversalClient
Queue queue.Queuer
Logger log.StdLogger
Cache cache.Cache
Expand Down
27 changes: 12 additions & 15 deletions cmd/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package agent

import (
"context"
"fmt"
"os"
"os/signal"
"time"
Expand Down Expand Up @@ -114,38 +113,35 @@ func AddAgentCommand(a *cli.App) *cobra.Command {
cmd.Flags().Uint32Var(&workerPort, "worker-port", 0, "Worker port")
cmd.Flags().Uint32Var(&ingestPort, "ingest-port", 0, "Ingest port")

cmd.Flags().StringVar(&logLevel, "log-level", "", "scheduler log level")
cmd.Flags().StringVar(&logLevel, "log-level", "", "Log level")
cmd.Flags().IntVar(&consumerPoolSize, "consumers", -1, "Size of the consumers pool.")
cmd.Flags().IntVar(&interval, "interval", 10, "the time interval, measured in seconds to update the in-memory store from the database")
cmd.Flags().StringVar(&executionMode, "mode", "", "Execution Mode (one of events, retry and default)")

return cmd
}

func startServerComponent(ctx context.Context, a *cli.App) error {
func startServerComponent(_ context.Context, a *cli.App) error {
lo := a.Logger.(*log.Logger)
lo.SetPrefix("agent")

cfg, err := config.Get()
if err != nil {
a.Logger.WithError(err).Fatal("Failed to load configuration")
lo.WithError(err).Fatal("Failed to load configuration")
}

start := time.Now()
a.Logger.Info("Starting Convoy data plane ...")
lo.Info("Starting Convoy data plane")

apiKeyRepo := postgres.NewAPIKeyRepo(a.DB, a.Cache)
userRepo := postgres.NewUserRepo(a.DB, a.Cache)
portalLinkRepo := postgres.NewPortalLinkRepo(a.DB, a.Cache)
err = realm_chain.Init(&cfg.Auth, apiKeyRepo, userRepo, portalLinkRepo, a.Cache)
if err != nil {
a.Logger.WithError(err).Fatal("failed to initialize realm chain")
}

flag, err := fflag.NewFFlag(&cfg)
if err != nil {
a.Logger.WithError(err).Fatal("failed to create fflag controller")
lo.WithError(err).Fatal("failed to initialize realm chain")
}

lo := a.Logger.(*log.Logger)
lo.SetPrefix("api server")
flag := fflag.NewFFlag(&cfg)

lvl, err := log.ParseLevel(cfg.Logger.Level)
if err != nil {
Expand All @@ -164,6 +160,7 @@ func startServerComponent(ctx context.Context, a *cli.App) error {
Logger: lo,
Cache: a.Cache,
Rate: a.Rate,
Redis: a.Redis,
Licenser: a.Licenser,
})
if err != nil {
Expand All @@ -172,7 +169,7 @@ func startServerComponent(ctx context.Context, a *cli.App) error {

srv.SetHandler(evHandler.BuildDataPlaneRoutes())

fmt.Printf("Started convoy server in %s\n", time.Since(start))
lo.Infof("Started convoy server in %s", time.Since(start))

httpConfig := cfg.Server.HTTP
if httpConfig.SSL {
Expand All @@ -181,7 +178,7 @@ func startServerComponent(ctx context.Context, a *cli.App) error {
return nil
}

fmt.Printf("Starting Convoy Agent on port %v\n", cfg.Server.HTTP.AgentPort)
lo.Infof("Starting Convoy Agent on port %v", cfg.Server.HTTP.AgentPort)

go func() {
srv.Listen()
Expand Down
6 changes: 2 additions & 4 deletions cmd/ff/feature_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@ func AddFeatureFlagsCommand() *cobra.Command {
if err != nil {
log.WithError(err).Fatal("Error fetching the config.")
}
f, err := fflag2.NewFFlag(&cfg)
if err != nil {
return err
}

f := fflag2.NewFFlag(&cfg)
return f.ListFeatures()
},
PersistentPostRun: func(cmd *cobra.Command, args []string) {},
Expand Down
Loading
Loading