Skip to content
This repository has been archived by the owner on Jan 19, 2023. It is now read-only.

Websocket Streaming Interface #2581

Merged
merged 9 commits into from
Jun 25, 2021
Merged
3 changes: 2 additions & 1 deletion internal/api/action_request_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
ocontext "github.com/vmware-tanzu/octant/internal/context"
"github.com/vmware-tanzu/octant/internal/octant"
"github.com/vmware-tanzu/octant/pkg/action"
"github.com/vmware-tanzu/octant/pkg/api"
)

const (
Expand All @@ -33,7 +34,7 @@ func NewActionRequestManager(dashConfig config.Dash) *ActionRequestManager {
}
}

func (a ActionRequestManager) Start(ctx context.Context, state octant.State, s OctantClient) {
func (a ActionRequestManager) Start(ctx context.Context, state octant.State, s api.OctantClient) {
}

// Handlers returns the handlers this manager supports.
Expand Down
27 changes: 14 additions & 13 deletions internal/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/vmware-tanzu/octant/internal/config"
"github.com/vmware-tanzu/octant/internal/mime"
"github.com/vmware-tanzu/octant/internal/module"
"github.com/vmware-tanzu/octant/pkg/api"
"github.com/vmware-tanzu/octant/pkg/log"
)

Expand All @@ -36,7 +37,7 @@ const (
defaultListenerAddr = "127.0.0.1:7777"
)

func acceptedHosts() []string {
func AcceptedHosts() []string {
hosts := []string{
"localhost",
"127.0.0.1",
Expand Down Expand Up @@ -117,11 +118,11 @@ func RespondWithError(w http.ResponseWriter, code int, message string, logger lo
type API struct {
ctx context.Context
moduleManager module.ManagerInterface
actionDispatcher ActionDispatcher
actionDispatcher api.ActionDispatcher
prefix string
dashConfig config.Dash
logger log.Logger
wsClientManager *WebsocketClientManager
scManager *api.StreamingConnectionManager

modulePaths map[string]module.Module
modules []module.Module
Expand All @@ -131,7 +132,7 @@ type API struct {
var _ Service = (*API)(nil)

// New creates an instance of API.
func New(ctx context.Context, prefix string, actionDispatcher ActionDispatcher, websocketClientManager *WebsocketClientManager, dashConfig config.Dash) *API {
func New(ctx context.Context, prefix string, actionDispatcher api.ActionDispatcher, streamingConnectionManager *api.StreamingConnectionManager, dashConfig config.Dash) *API {
logger := dashConfig.Logger().With("component", "api")
return &API{
ctx: ctx,
Expand All @@ -141,7 +142,7 @@ func New(ctx context.Context, prefix string, actionDispatcher ActionDispatcher,
dashConfig: dashConfig,
logger: logger,
forceUpdateCh: make(chan bool, 1),
wsClientManager: websocketClientManager,
scManager: streamingConnectionManager,
}
}

Expand All @@ -156,11 +157,11 @@ func (a *API) Handler(ctx context.Context) (http.Handler, error) {
return nil, fmt.Errorf("missing dashConfig")
}
router := mux.NewRouter()
router.Use(rebindHandler(ctx, acceptedHosts()))
router.Use(rebindHandler(ctx, AcceptedHosts()))

s := router.PathPrefix(a.prefix).Subrouter()

s.Handle("/stream", websocketService(a.wsClientManager, a.dashConfig))
s.Handle("/stream", streamService(a.scManager, a.dashConfig))

s.NotFoundHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
a.logger.Errorf("api handler not found: %s", r.URL.String())
Expand All @@ -174,10 +175,10 @@ func (a *API) Handler(ctx context.Context) (http.Handler, error) {
type LoadingAPI struct {
ctx context.Context
moduleManager module.ManagerInterface
actionDispatcher ActionDispatcher
actionDispatcher api.ActionDispatcher
prefix string
logger log.Logger
wsClientManager *WebsocketClientManager
scManager *api.StreamingConnectionManager

modulePaths map[string]module.Module
modules []module.Module
Expand All @@ -187,7 +188,7 @@ type LoadingAPI struct {
var _ Service = (*LoadingAPI)(nil)

// NewLoadingAPI creates an instance of LoadingAPI
func NewLoadingAPI(ctx context.Context, prefix string, actionDispatcher ActionDispatcher, websocketClientManager *WebsocketClientManager, logger log.Logger) *LoadingAPI {
func NewLoadingAPI(ctx context.Context, prefix string, actionDispatcher api.ActionDispatcher, websocketClientManager *api.StreamingConnectionManager, logger log.Logger) *LoadingAPI {
logger = logger.With("component", "loading api")
return &LoadingAPI{
ctx: ctx,
Expand All @@ -196,7 +197,7 @@ func NewLoadingAPI(ctx context.Context, prefix string, actionDispatcher ActionDi
modulePaths: make(map[string]module.Module),
logger: logger,
forceUpdateCh: make(chan bool, 1),
wsClientManager: websocketClientManager,
scManager: websocketClientManager,
}
}

Expand All @@ -208,11 +209,11 @@ func (l *LoadingAPI) ForceUpdate() error {
// Handler contains a list of handlers
func (l *LoadingAPI) Handler(ctx context.Context) (http.Handler, error) {
router := mux.NewRouter()
router.Use(rebindHandler(ctx, acceptedHosts()))
router.Use(rebindHandler(ctx, AcceptedHosts()))

s := router.PathPrefix(l.prefix).Subrouter()

s.Handle("/stream", loadingWebsocketService(l.wsClientManager))
s.Handle("/stream", loadingStreamService(l.scManager))

return router, nil
}
10 changes: 6 additions & 4 deletions internal/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/vmware-tanzu/octant/internal/api"
apiFake "github.com/vmware-tanzu/octant/internal/api/fake"
internalAPI "github.com/vmware-tanzu/octant/internal/api"
clusterFake "github.com/vmware-tanzu/octant/internal/cluster/fake"
configFake "github.com/vmware-tanzu/octant/internal/config/fake"
"github.com/vmware-tanzu/octant/internal/log"
"github.com/vmware-tanzu/octant/internal/module"
moduleFake "github.com/vmware-tanzu/octant/internal/module/fake"
"github.com/vmware-tanzu/octant/pkg/api"
apiFake "github.com/vmware-tanzu/octant/pkg/api/fake"
"github.com/vmware-tanzu/octant/pkg/navigation"
"github.com/vmware-tanzu/octant/pkg/view/component"
)
Expand Down Expand Up @@ -95,11 +96,12 @@ func TestAPI_routes(t *testing.T) {
AnyTimes()

actionDispatcher := apiFake.NewMockActionDispatcher(controller)
streamingClientFactory := apiFake.NewMockStreamingClientFactory(controller)

ctx := context.Background()
wsClientManager := api.NewWebsocketClientManager(ctx, actionDispatcher)
scManager := api.NewStreamingConnectionManager(ctx, actionDispatcher, streamingClientFactory)

srv := api.New(ctx, "/", actionDispatcher, wsClientManager, dashConfig)
srv := internalAPI.New(ctx, "/", actionDispatcher, scManager, dashConfig)

handler, err := srv.Handler(ctx)
require.NoError(t, err)
Expand Down
5 changes: 3 additions & 2 deletions internal/api/container_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"sync"
"time"

"github.com/vmware-tanzu/octant/pkg/api"
"github.com/vmware-tanzu/octant/pkg/event"

"github.com/vmware-tanzu/octant/internal/gvk"
Expand All @@ -36,7 +37,7 @@ const (
)

type podLogsStateManager struct {
client OctantClient
client api.OctantClient
config config.Dash
ctx context.Context

Expand Down Expand Up @@ -145,7 +146,7 @@ func (s *podLogsStateManager) StreamPodLogsUnsubscribe(_ octant.State, payload a
return nil
}

func (s *podLogsStateManager) Start(ctx context.Context, _ octant.State, client OctantClient) {
func (s *podLogsStateManager) Start(ctx context.Context, _ octant.State, client api.OctantClient) {
s.client = client
s.ctx = ctx
}
Expand Down
3 changes: 2 additions & 1 deletion internal/api/container_logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"
"time"

"github.com/vmware-tanzu/octant/pkg/api"
"github.com/vmware-tanzu/octant/pkg/event"

"github.com/golang/mock/gomock"
Expand Down Expand Up @@ -106,7 +107,7 @@ type octantClient struct {
stopCh chan struct{}
}

var _ OctantClient = &octantClient{}
var _ api.OctantClient = &octantClient{}

func newOctantClient() *octantClient {
return &octantClient{
Expand Down
5 changes: 3 additions & 2 deletions internal/api/content_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

"github.com/vmware-tanzu/octant/internal/config"
ocontext "github.com/vmware-tanzu/octant/internal/context"
"github.com/vmware-tanzu/octant/pkg/api"
oevent "github.com/vmware-tanzu/octant/pkg/event"

oerrors "github.com/vmware-tanzu/octant/internal/errors"
Expand Down Expand Up @@ -107,7 +108,7 @@ func NewContentManager(moduleManager module.ManagerInterface, dashConfig config.
var _ StateManager = (*ContentManager)(nil)

// Start starts the manager.
func (cm *ContentManager) Start(ctx context.Context, state octant.State, s OctantClient) {
func (cm *ContentManager) Start(ctx context.Context, state octant.State, s api.OctantClient) {
cm.ctx = ctx
logger := internalLog.From(ctx)
logger.Debugf("starting content manager")
Expand All @@ -132,7 +133,7 @@ func (cm *ContentManager) Start(ctx context.Context, state octant.State, s Octan
cm.poller.Run(ctx, cm.updateContentCh, cm.runUpdate(state, s), event.DefaultScheduleDelay)
}

func (cm *ContentManager) runUpdate(state octant.State, s OctantClient) PollerFunc {
func (cm *ContentManager) runUpdate(state octant.State, s api.OctantClient) PollerFunc {
previousChecksum := ""

return func(ctx context.Context) bool {
Expand Down
16 changes: 15 additions & 1 deletion internal/api/content_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ package api_test

import (
"context"
"sort"
"testing"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/vmware-tanzu/octant/internal/api"
"github.com/vmware-tanzu/octant/internal/api/fake"
configFake "github.com/vmware-tanzu/octant/internal/config/fake"
ocontext "github.com/vmware-tanzu/octant/internal/context"
"github.com/vmware-tanzu/octant/internal/log"
Expand All @@ -22,6 +23,7 @@ import (
"github.com/vmware-tanzu/octant/internal/octant"
octantFake "github.com/vmware-tanzu/octant/internal/octant/fake"
"github.com/vmware-tanzu/octant/pkg/action"
"github.com/vmware-tanzu/octant/pkg/api/fake"
"github.com/vmware-tanzu/octant/pkg/navigation"
"github.com/vmware-tanzu/octant/pkg/view/component"
)
Expand Down Expand Up @@ -209,3 +211,15 @@ func TestContentManager_SetQueryParams(t *testing.T) {
})
}
}

func AssertHandlers(t *testing.T, manager api.StateManager, expected []string) {
handlers := manager.Handlers()
var got []string
for _, h := range handlers {
got = append(got, h.RequestType)
}
sort.Strings(got)
sort.Strings(expected)

assert.Equal(t, expected, got)
}
5 changes: 3 additions & 2 deletions internal/api/context_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/vmware-tanzu/octant/internal/util/json"

"github.com/vmware-tanzu/octant/pkg/api"
oevent "github.com/vmware-tanzu/octant/pkg/event"

"github.com/pkg/errors"
Expand Down Expand Up @@ -89,12 +90,12 @@ func (c *ContextManager) SetContext(state octant.State, payload action.Payload)
}

// Start starts the manager.
func (c *ContextManager) Start(ctx context.Context, state octant.State, s OctantClient) {
func (c *ContextManager) Start(ctx context.Context, state octant.State, s api.OctantClient) {
c.poller.Run(ctx, nil, c.runUpdate(state, s), event.DefaultScheduleDelay)
c.ctx = ctx
}

func (c *ContextManager) runUpdate(state octant.State, s OctantClient) PollerFunc {
func (c *ContextManager) runUpdate(state octant.State, s api.OctantClient) PollerFunc {
var previous []byte

logger := c.dashConfig.Logger()
Expand Down
2 changes: 1 addition & 1 deletion internal/api/context_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ import (
"github.com/golang/mock/gomock"

"github.com/vmware-tanzu/octant/internal/api"
"github.com/vmware-tanzu/octant/internal/api/fake"
configFake "github.com/vmware-tanzu/octant/internal/config/fake"
"github.com/vmware-tanzu/octant/internal/log"
"github.com/vmware-tanzu/octant/internal/octant"
octantFake "github.com/vmware-tanzu/octant/internal/octant/fake"
"github.com/vmware-tanzu/octant/pkg/api/fake"
)

func TestContextManager_Handlers(t *testing.T) {
Expand Down
3 changes: 2 additions & 1 deletion internal/api/filter_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/vmware-tanzu/octant/internal/octant"
"github.com/vmware-tanzu/octant/pkg/action"
"github.com/vmware-tanzu/octant/pkg/api"
)

const (
Expand All @@ -37,7 +38,7 @@ func NewFilterManager() *FilterManager {
}

// Start starts the manager. Current is a no-op.
func (fm *FilterManager) Start(ctx context.Context, state octant.State, s OctantClient) {
func (fm *FilterManager) Start(ctx context.Context, state octant.State, s api.OctantClient) {
fm.ctx = ctx
}

Expand Down
7 changes: 4 additions & 3 deletions internal/api/helper_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package api
import (
"context"

"github.com/vmware-tanzu/octant/pkg/api"
oevent "github.com/vmware-tanzu/octant/pkg/event"

"github.com/vmware-tanzu/octant/internal/config"
Expand Down Expand Up @@ -43,7 +44,7 @@ type HelperStateManager struct {
poller Poller
}

var _StateManager = (*HelperStateManager)(nil)
var _ StateManager = (*HelperStateManager)(nil)

// NewHelperStateManager creates an instance of HelperStateManager
func NewHelperStateManager(dashConfig config.Dash, options ...HelperStateManagerOption) *HelperStateManager {
Expand All @@ -67,11 +68,11 @@ func (h *HelperStateManager) Handlers() []octant.ClientRequestHandler {
}

// Start starts the manager
func (h *HelperStateManager) Start(ctx context.Context, state octant.State, client OctantClient) {
func (h *HelperStateManager) Start(ctx context.Context, state octant.State, client api.OctantClient) {
h.poller.Run(ctx, nil, h.runUpdate(state, client), event.DefaultScheduleDelay)
}

func (h *HelperStateManager) runUpdate(state octant.State, client OctantClient) PollerFunc {
func (h *HelperStateManager) runUpdate(state octant.State, client api.OctantClient) PollerFunc {
var buildInfoGenerated, kubeConfigPathGenerated bool

return func(ctx context.Context) bool {
Expand Down
2 changes: 1 addition & 1 deletion internal/api/helper_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ import (
"github.com/golang/mock/gomock"

"github.com/vmware-tanzu/octant/internal/api"
"github.com/vmware-tanzu/octant/internal/api/fake"
configFake "github.com/vmware-tanzu/octant/internal/config/fake"
"github.com/vmware-tanzu/octant/internal/log"
octantFake "github.com/vmware-tanzu/octant/internal/octant/fake"
"github.com/vmware-tanzu/octant/pkg/api/fake"
)

func TestHelperManager_GenerateContent(t *testing.T) {
Expand Down
Loading