diff --git a/internal/api/action_request_manager.go b/internal/api/action_request_manager.go index 6d28aef787..82c6c73f40 100644 --- a/internal/api/action_request_manager.go +++ b/internal/api/action_request_manager.go @@ -12,6 +12,7 @@ import ( ocontext "github.com/vmware-tanzu/octant/internal/context" "github.com/vmware-tanzu/octant/internal/octant" "github.com/vmware-tanzu/octant/pkg/action" + "github.com/vmware-tanzu/octant/pkg/api" ) const ( @@ -33,7 +34,7 @@ func NewActionRequestManager(dashConfig config.Dash) *ActionRequestManager { } } -func (a ActionRequestManager) Start(ctx context.Context, state octant.State, s OctantClient) { +func (a ActionRequestManager) Start(ctx context.Context, state octant.State, s api.OctantClient) { } // Handlers returns the handlers this manager supports. diff --git a/internal/api/api.go b/internal/api/api.go index 32a67d22be..070b827935 100644 --- a/internal/api/api.go +++ b/internal/api/api.go @@ -22,6 +22,7 @@ import ( "github.com/vmware-tanzu/octant/internal/config" "github.com/vmware-tanzu/octant/internal/mime" "github.com/vmware-tanzu/octant/internal/module" + "github.com/vmware-tanzu/octant/pkg/api" "github.com/vmware-tanzu/octant/pkg/log" ) @@ -36,7 +37,7 @@ const ( defaultListenerAddr = "127.0.0.1:7777" ) -func acceptedHosts() []string { +func AcceptedHosts() []string { hosts := []string{ "localhost", "127.0.0.1", @@ -117,11 +118,11 @@ func RespondWithError(w http.ResponseWriter, code int, message string, logger lo type API struct { ctx context.Context moduleManager module.ManagerInterface - actionDispatcher ActionDispatcher + actionDispatcher api.ActionDispatcher prefix string dashConfig config.Dash logger log.Logger - wsClientManager *WebsocketClientManager + scManager *api.StreamingConnectionManager modulePaths map[string]module.Module modules []module.Module @@ -131,7 +132,7 @@ type API struct { var _ Service = (*API)(nil) // New creates an instance of API. -func New(ctx context.Context, prefix string, actionDispatcher ActionDispatcher, websocketClientManager *WebsocketClientManager, dashConfig config.Dash) *API { +func New(ctx context.Context, prefix string, actionDispatcher api.ActionDispatcher, streamingConnectionManager *api.StreamingConnectionManager, dashConfig config.Dash) *API { logger := dashConfig.Logger().With("component", "api") return &API{ ctx: ctx, @@ -141,7 +142,7 @@ func New(ctx context.Context, prefix string, actionDispatcher ActionDispatcher, dashConfig: dashConfig, logger: logger, forceUpdateCh: make(chan bool, 1), - wsClientManager: websocketClientManager, + scManager: streamingConnectionManager, } } @@ -156,11 +157,11 @@ func (a *API) Handler(ctx context.Context) (http.Handler, error) { return nil, fmt.Errorf("missing dashConfig") } router := mux.NewRouter() - router.Use(rebindHandler(ctx, acceptedHosts())) + router.Use(rebindHandler(ctx, AcceptedHosts())) s := router.PathPrefix(a.prefix).Subrouter() - s.Handle("/stream", websocketService(a.wsClientManager, a.dashConfig)) + s.Handle("/stream", streamService(a.scManager, a.dashConfig)) s.NotFoundHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { a.logger.Errorf("api handler not found: %s", r.URL.String()) @@ -174,10 +175,10 @@ func (a *API) Handler(ctx context.Context) (http.Handler, error) { type LoadingAPI struct { ctx context.Context moduleManager module.ManagerInterface - actionDispatcher ActionDispatcher + actionDispatcher api.ActionDispatcher prefix string logger log.Logger - wsClientManager *WebsocketClientManager + scManager *api.StreamingConnectionManager modulePaths map[string]module.Module modules []module.Module @@ -187,7 +188,7 @@ type LoadingAPI struct { var _ Service = (*LoadingAPI)(nil) // NewLoadingAPI creates an instance of LoadingAPI -func NewLoadingAPI(ctx context.Context, prefix string, actionDispatcher ActionDispatcher, websocketClientManager *WebsocketClientManager, logger log.Logger) *LoadingAPI { +func NewLoadingAPI(ctx context.Context, prefix string, actionDispatcher api.ActionDispatcher, websocketClientManager *api.StreamingConnectionManager, logger log.Logger) *LoadingAPI { logger = logger.With("component", "loading api") return &LoadingAPI{ ctx: ctx, @@ -196,7 +197,7 @@ func NewLoadingAPI(ctx context.Context, prefix string, actionDispatcher ActionDi modulePaths: make(map[string]module.Module), logger: logger, forceUpdateCh: make(chan bool, 1), - wsClientManager: websocketClientManager, + scManager: websocketClientManager, } } @@ -208,11 +209,11 @@ func (l *LoadingAPI) ForceUpdate() error { // Handler contains a list of handlers func (l *LoadingAPI) Handler(ctx context.Context) (http.Handler, error) { router := mux.NewRouter() - router.Use(rebindHandler(ctx, acceptedHosts())) + router.Use(rebindHandler(ctx, AcceptedHosts())) s := router.PathPrefix(l.prefix).Subrouter() - s.Handle("/stream", loadingWebsocketService(l.wsClientManager)) + s.Handle("/stream", loadingStreamService(l.scManager)) return router, nil } diff --git a/internal/api/api_test.go b/internal/api/api_test.go index f3ec31fc39..7cf5a30901 100644 --- a/internal/api/api_test.go +++ b/internal/api/api_test.go @@ -20,13 +20,14 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/vmware-tanzu/octant/internal/api" - apiFake "github.com/vmware-tanzu/octant/internal/api/fake" + internalAPI "github.com/vmware-tanzu/octant/internal/api" clusterFake "github.com/vmware-tanzu/octant/internal/cluster/fake" configFake "github.com/vmware-tanzu/octant/internal/config/fake" "github.com/vmware-tanzu/octant/internal/log" "github.com/vmware-tanzu/octant/internal/module" moduleFake "github.com/vmware-tanzu/octant/internal/module/fake" + "github.com/vmware-tanzu/octant/pkg/api" + apiFake "github.com/vmware-tanzu/octant/pkg/api/fake" "github.com/vmware-tanzu/octant/pkg/navigation" "github.com/vmware-tanzu/octant/pkg/view/component" ) @@ -95,11 +96,12 @@ func TestAPI_routes(t *testing.T) { AnyTimes() actionDispatcher := apiFake.NewMockActionDispatcher(controller) + streamingClientFactory := apiFake.NewMockStreamingClientFactory(controller) ctx := context.Background() - wsClientManager := api.NewWebsocketClientManager(ctx, actionDispatcher) + scManager := api.NewStreamingConnectionManager(ctx, actionDispatcher, streamingClientFactory) - srv := api.New(ctx, "/", actionDispatcher, wsClientManager, dashConfig) + srv := internalAPI.New(ctx, "/", actionDispatcher, scManager, dashConfig) handler, err := srv.Handler(ctx) require.NoError(t, err) diff --git a/internal/api/container_logs.go b/internal/api/container_logs.go index d03518abfb..ca4c6e9dfc 100644 --- a/internal/api/container_logs.go +++ b/internal/api/container_logs.go @@ -12,6 +12,7 @@ import ( "sync" "time" + "github.com/vmware-tanzu/octant/pkg/api" "github.com/vmware-tanzu/octant/pkg/event" "github.com/vmware-tanzu/octant/internal/gvk" @@ -36,7 +37,7 @@ const ( ) type podLogsStateManager struct { - client OctantClient + client api.OctantClient config config.Dash ctx context.Context @@ -145,7 +146,7 @@ func (s *podLogsStateManager) StreamPodLogsUnsubscribe(_ octant.State, payload a return nil } -func (s *podLogsStateManager) Start(ctx context.Context, _ octant.State, client OctantClient) { +func (s *podLogsStateManager) Start(ctx context.Context, _ octant.State, client api.OctantClient) { s.client = client s.ctx = ctx } diff --git a/internal/api/container_logs_test.go b/internal/api/container_logs_test.go index 0882a0579e..4ed7f433f8 100644 --- a/internal/api/container_logs_test.go +++ b/internal/api/container_logs_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + "github.com/vmware-tanzu/octant/pkg/api" "github.com/vmware-tanzu/octant/pkg/event" "github.com/golang/mock/gomock" @@ -106,7 +107,7 @@ type octantClient struct { stopCh chan struct{} } -var _ OctantClient = &octantClient{} +var _ api.OctantClient = &octantClient{} func newOctantClient() *octantClient { return &octantClient{ diff --git a/internal/api/content_manager.go b/internal/api/content_manager.go index 14bde8c94e..c7f84e473b 100644 --- a/internal/api/content_manager.go +++ b/internal/api/content_manager.go @@ -18,6 +18,7 @@ import ( "github.com/vmware-tanzu/octant/internal/config" ocontext "github.com/vmware-tanzu/octant/internal/context" + "github.com/vmware-tanzu/octant/pkg/api" oevent "github.com/vmware-tanzu/octant/pkg/event" oerrors "github.com/vmware-tanzu/octant/internal/errors" @@ -107,7 +108,7 @@ func NewContentManager(moduleManager module.ManagerInterface, dashConfig config. var _ StateManager = (*ContentManager)(nil) // Start starts the manager. -func (cm *ContentManager) Start(ctx context.Context, state octant.State, s OctantClient) { +func (cm *ContentManager) Start(ctx context.Context, state octant.State, s api.OctantClient) { cm.ctx = ctx logger := internalLog.From(ctx) logger.Debugf("starting content manager") @@ -132,7 +133,7 @@ func (cm *ContentManager) Start(ctx context.Context, state octant.State, s Octan cm.poller.Run(ctx, cm.updateContentCh, cm.runUpdate(state, s), event.DefaultScheduleDelay) } -func (cm *ContentManager) runUpdate(state octant.State, s OctantClient) PollerFunc { +func (cm *ContentManager) runUpdate(state octant.State, s api.OctantClient) PollerFunc { previousChecksum := "" return func(ctx context.Context) bool { diff --git a/internal/api/content_manager_test.go b/internal/api/content_manager_test.go index 9220333c92..684bb7ec3b 100644 --- a/internal/api/content_manager_test.go +++ b/internal/api/content_manager_test.go @@ -7,13 +7,14 @@ package api_test import ( "context" + "sort" "testing" "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/vmware-tanzu/octant/internal/api" - "github.com/vmware-tanzu/octant/internal/api/fake" configFake "github.com/vmware-tanzu/octant/internal/config/fake" ocontext "github.com/vmware-tanzu/octant/internal/context" "github.com/vmware-tanzu/octant/internal/log" @@ -22,6 +23,7 @@ import ( "github.com/vmware-tanzu/octant/internal/octant" octantFake "github.com/vmware-tanzu/octant/internal/octant/fake" "github.com/vmware-tanzu/octant/pkg/action" + "github.com/vmware-tanzu/octant/pkg/api/fake" "github.com/vmware-tanzu/octant/pkg/navigation" "github.com/vmware-tanzu/octant/pkg/view/component" ) @@ -209,3 +211,15 @@ func TestContentManager_SetQueryParams(t *testing.T) { }) } } + +func AssertHandlers(t *testing.T, manager api.StateManager, expected []string) { + handlers := manager.Handlers() + var got []string + for _, h := range handlers { + got = append(got, h.RequestType) + } + sort.Strings(got) + sort.Strings(expected) + + assert.Equal(t, expected, got) +} diff --git a/internal/api/context_manager.go b/internal/api/context_manager.go index 47287474f4..7726da26cf 100644 --- a/internal/api/context_manager.go +++ b/internal/api/context_manager.go @@ -11,6 +11,7 @@ import ( "github.com/vmware-tanzu/octant/internal/util/json" + "github.com/vmware-tanzu/octant/pkg/api" oevent "github.com/vmware-tanzu/octant/pkg/event" "github.com/pkg/errors" @@ -89,12 +90,12 @@ func (c *ContextManager) SetContext(state octant.State, payload action.Payload) } // Start starts the manager. -func (c *ContextManager) Start(ctx context.Context, state octant.State, s OctantClient) { +func (c *ContextManager) Start(ctx context.Context, state octant.State, s api.OctantClient) { c.poller.Run(ctx, nil, c.runUpdate(state, s), event.DefaultScheduleDelay) c.ctx = ctx } -func (c *ContextManager) runUpdate(state octant.State, s OctantClient) PollerFunc { +func (c *ContextManager) runUpdate(state octant.State, s api.OctantClient) PollerFunc { var previous []byte logger := c.dashConfig.Logger() diff --git a/internal/api/context_manager_test.go b/internal/api/context_manager_test.go index 9dd09eeeed..20f6b50e68 100644 --- a/internal/api/context_manager_test.go +++ b/internal/api/context_manager_test.go @@ -15,11 +15,11 @@ import ( "github.com/golang/mock/gomock" "github.com/vmware-tanzu/octant/internal/api" - "github.com/vmware-tanzu/octant/internal/api/fake" configFake "github.com/vmware-tanzu/octant/internal/config/fake" "github.com/vmware-tanzu/octant/internal/log" "github.com/vmware-tanzu/octant/internal/octant" octantFake "github.com/vmware-tanzu/octant/internal/octant/fake" + "github.com/vmware-tanzu/octant/pkg/api/fake" ) func TestContextManager_Handlers(t *testing.T) { diff --git a/internal/api/filter_manager.go b/internal/api/filter_manager.go index 924754bda6..c76233332b 100644 --- a/internal/api/filter_manager.go +++ b/internal/api/filter_manager.go @@ -16,6 +16,7 @@ import ( "github.com/vmware-tanzu/octant/internal/octant" "github.com/vmware-tanzu/octant/pkg/action" + "github.com/vmware-tanzu/octant/pkg/api" ) const ( @@ -37,7 +38,7 @@ func NewFilterManager() *FilterManager { } // Start starts the manager. Current is a no-op. -func (fm *FilterManager) Start(ctx context.Context, state octant.State, s OctantClient) { +func (fm *FilterManager) Start(ctx context.Context, state octant.State, s api.OctantClient) { fm.ctx = ctx } diff --git a/internal/api/helper_manager.go b/internal/api/helper_manager.go index 8534618c47..6f3ab3bcf2 100644 --- a/internal/api/helper_manager.go +++ b/internal/api/helper_manager.go @@ -8,6 +8,7 @@ package api import ( "context" + "github.com/vmware-tanzu/octant/pkg/api" oevent "github.com/vmware-tanzu/octant/pkg/event" "github.com/vmware-tanzu/octant/internal/config" @@ -43,7 +44,7 @@ type HelperStateManager struct { poller Poller } -var _StateManager = (*HelperStateManager)(nil) +var _ StateManager = (*HelperStateManager)(nil) // NewHelperStateManager creates an instance of HelperStateManager func NewHelperStateManager(dashConfig config.Dash, options ...HelperStateManagerOption) *HelperStateManager { @@ -67,11 +68,11 @@ func (h *HelperStateManager) Handlers() []octant.ClientRequestHandler { } // Start starts the manager -func (h *HelperStateManager) Start(ctx context.Context, state octant.State, client OctantClient) { +func (h *HelperStateManager) Start(ctx context.Context, state octant.State, client api.OctantClient) { h.poller.Run(ctx, nil, h.runUpdate(state, client), event.DefaultScheduleDelay) } -func (h *HelperStateManager) runUpdate(state octant.State, client OctantClient) PollerFunc { +func (h *HelperStateManager) runUpdate(state octant.State, client api.OctantClient) PollerFunc { var buildInfoGenerated, kubeConfigPathGenerated bool return func(ctx context.Context) bool { diff --git a/internal/api/helper_manager_test.go b/internal/api/helper_manager_test.go index c6059a9770..a270182b46 100644 --- a/internal/api/helper_manager_test.go +++ b/internal/api/helper_manager_test.go @@ -14,10 +14,10 @@ import ( "github.com/golang/mock/gomock" "github.com/vmware-tanzu/octant/internal/api" - "github.com/vmware-tanzu/octant/internal/api/fake" configFake "github.com/vmware-tanzu/octant/internal/config/fake" "github.com/vmware-tanzu/octant/internal/log" octantFake "github.com/vmware-tanzu/octant/internal/octant/fake" + "github.com/vmware-tanzu/octant/pkg/api/fake" ) func TestHelperManager_GenerateContent(t *testing.T) { diff --git a/internal/api/loading_state.go b/internal/api/loading_state.go index c416a12ba8..e6f15c567e 100644 --- a/internal/api/loading_state.go +++ b/internal/api/loading_state.go @@ -15,6 +15,7 @@ import ( "sync/atomic" "time" + "github.com/vmware-tanzu/octant/pkg/api" "github.com/vmware-tanzu/octant/pkg/event" "github.com/spf13/afero" @@ -57,7 +58,7 @@ func (l *LoadingManager) Handlers() []octant.ClientRequestHandler { } } -func (l *LoadingManager) Start(ctx context.Context, state octant.State, client OctantClient) { +func (l *LoadingManager) Start(ctx context.Context, state octant.State, client api.OctantClient) { l.client.Store(client) l.ctx = ctx l.kubeConfigPath = ocontext.KubeConfigChFrom(ctx) @@ -76,7 +77,7 @@ func (l *LoadingManager) CheckLoading(state octant.State, payload action.Payload } if loading { - client := l.client.Load().(OctantClient) + client := l.client.Load().(api.OctantClient) client.Send(event.Event{ Type: event.EventTypeLoading, }) @@ -123,7 +124,7 @@ func (l *LoadingManager) UploadKubeConfig(state octant.State, payload action.Pay return err } - client := l.client.Load().(OctantClient) + client := l.client.Load().(api.OctantClient) client.Send(event.Event{ Type: event.EventTypeRefresh, }) @@ -132,7 +133,7 @@ func (l *LoadingManager) UploadKubeConfig(state octant.State, payload action.Pay return nil } -func (l *LoadingManager) WatchConfig(path chan string, client OctantClient, fs afero.Fs) { +func (l *LoadingManager) WatchConfig(path chan string, client api.OctantClient, fs afero.Fs) { kubeconfig := clientcmd.NewDefaultClientConfigLoadingRules().GetDefaultFilename() for { diff --git a/internal/api/loading_state_test.go b/internal/api/loading_state_test.go index 1a026bf8b2..08ce4a4222 100644 --- a/internal/api/loading_state_test.go +++ b/internal/api/loading_state_test.go @@ -17,7 +17,7 @@ import ( "k8s.io/client-go/tools/clientcmd" "github.com/vmware-tanzu/octant/internal/api" - "github.com/vmware-tanzu/octant/internal/api/fake" + "github.com/vmware-tanzu/octant/pkg/api/fake" ) func Test_watchConfig(t *testing.T) { diff --git a/internal/api/middleware.go b/internal/api/middleware.go index b4f7efd21a..614ea8a29b 100644 --- a/internal/api/middleware.go +++ b/internal/api/middleware.go @@ -82,7 +82,7 @@ func checkSameOrigin(r *http.Request) bool { // shouldAllowHost returns true if the incoming request.Host shuold be allowed // to access the API otherwise false. -func shouldAllowHost(host string, acceptedHosts []string) bool { +func ShouldAllowHost(host string, acceptedHosts []string) bool { if dashstrings.Contains("0.0.0.0", acceptedHosts) { return true } @@ -107,7 +107,7 @@ func rebindHandler(ctx context.Context, acceptedHosts []string) mux.MiddlewareFu } var httpErrors []string - if !shouldAllowHost(host, acceptedHosts) { + if !ShouldAllowHost(host, acceptedHosts) { logger := log.From(ctx) logger.Debugf("Requester %s not in accepted hosts: %s\nTo allow this host add it to the OCTANT_ACCEPTED_HOSTS environment variable.", host, acceptedHosts) httpErrors = append(httpErrors, "forbidden host") diff --git a/internal/api/middleware_test.go b/internal/api/middleware_test.go index 26b78de26f..326b19916b 100644 --- a/internal/api/middleware_test.go +++ b/internal/api/middleware_test.go @@ -87,7 +87,7 @@ func Test_rebindHandler(t *testing.T) { fmt.Fprint(w, "response") }) - wrapped := rebindHandler(context.TODO(), acceptedHosts())(fake) + wrapped := rebindHandler(context.TODO(), AcceptedHosts())(fake) ts := httptest.NewServer(wrapped) defer ts.Close() @@ -140,7 +140,7 @@ func Test_shouldAllowHost(t *testing.T) { for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { - require.Equal(t, tc.expected, shouldAllowHost(tc.host, tc.acceptedHosts)) + require.Equal(t, tc.expected, ShouldAllowHost(tc.host, tc.acceptedHosts)) }) } } diff --git a/internal/api/namespaces_manager.go b/internal/api/namespaces_manager.go index b9031eaebd..0d0127a806 100644 --- a/internal/api/namespaces_manager.go +++ b/internal/api/namespaces_manager.go @@ -17,6 +17,7 @@ import ( "github.com/vmware-tanzu/octant/internal/event" "github.com/vmware-tanzu/octant/internal/log" "github.com/vmware-tanzu/octant/internal/octant" + "github.com/vmware-tanzu/octant/pkg/api" oevent "github.com/vmware-tanzu/octant/pkg/event" ) @@ -75,7 +76,7 @@ func (n NamespacesManager) Handlers() []octant.ClientRequestHandler { } // Start starts the manager. It periodically generates a list of namespaces. -func (n *NamespacesManager) Start(ctx context.Context, state octant.State, s OctantClient) { +func (n *NamespacesManager) Start(ctx context.Context, state octant.State, s api.OctantClient) { ch := make(chan struct{}, 1) defer func() { close(ch) @@ -84,7 +85,7 @@ func (n *NamespacesManager) Start(ctx context.Context, state octant.State, s Oct n.poller.Run(ctx, ch, n.runUpdate(state, s), event.DefaultScheduleDelay) } -func (n *NamespacesManager) runUpdate(state octant.State, client OctantClient) PollerFunc { +func (n *NamespacesManager) runUpdate(state octant.State, client api.OctantClient) PollerFunc { var previous []byte return func(ctx context.Context) bool { diff --git a/internal/api/namespaces_manager_test.go b/internal/api/namespaces_manager_test.go index 4d03dae4c4..0b0a6bed6f 100644 --- a/internal/api/namespaces_manager_test.go +++ b/internal/api/namespaces_manager_test.go @@ -13,10 +13,10 @@ import ( "github.com/stretchr/testify/require" "github.com/vmware-tanzu/octant/internal/api" - "github.com/vmware-tanzu/octant/internal/api/fake" clusterFake "github.com/vmware-tanzu/octant/internal/cluster/fake" configFake "github.com/vmware-tanzu/octant/internal/config/fake" octantFake "github.com/vmware-tanzu/octant/internal/octant/fake" + "github.com/vmware-tanzu/octant/pkg/api/fake" ) func TestNamespacesManager_GenerateNamespaces(t *testing.T) { diff --git a/internal/api/navigation_manager.go b/internal/api/navigation_manager.go index 1204bd5b67..a8a255bd05 100644 --- a/internal/api/navigation_manager.go +++ b/internal/api/navigation_manager.go @@ -13,6 +13,7 @@ import ( "github.com/vmware-tanzu/octant/internal/util/json" + "github.com/vmware-tanzu/octant/pkg/api" oevent "github.com/vmware-tanzu/octant/pkg/event" "github.com/pkg/errors" @@ -80,7 +81,7 @@ func (n NavigationManager) Handlers() []octant.ClientRequestHandler { } // Start starts the manager. It periodically generates navigation updates. -func (n *NavigationManager) Start(ctx context.Context, state octant.State, s OctantClient) { +func (n *NavigationManager) Start(ctx context.Context, state octant.State, s api.OctantClient) { ch := make(chan struct{}, 1) defer func() { close(ch) @@ -89,7 +90,7 @@ func (n *NavigationManager) Start(ctx context.Context, state octant.State, s Oct n.poller.Run(ctx, ch, n.runUpdate(state, s), event.DefaultScheduleDelay) } -func (n *NavigationManager) runUpdate(state octant.State, client OctantClient) PollerFunc { +func (n *NavigationManager) runUpdate(state octant.State, client api.OctantClient) PollerFunc { var previous []byte return func(ctx context.Context) bool { diff --git a/internal/api/navigation_manager_test.go b/internal/api/navigation_manager_test.go index 7d602930e8..dee844cf1d 100644 --- a/internal/api/navigation_manager_test.go +++ b/internal/api/navigation_manager_test.go @@ -13,12 +13,12 @@ import ( "github.com/stretchr/testify/require" "github.com/vmware-tanzu/octant/internal/api" - "github.com/vmware-tanzu/octant/internal/api/fake" configFake "github.com/vmware-tanzu/octant/internal/config/fake" "github.com/vmware-tanzu/octant/internal/module" moduleFake "github.com/vmware-tanzu/octant/internal/module/fake" "github.com/vmware-tanzu/octant/internal/octant" octantFake "github.com/vmware-tanzu/octant/internal/octant/fake" + "github.com/vmware-tanzu/octant/pkg/api/fake" "github.com/vmware-tanzu/octant/pkg/navigation" ) diff --git a/internal/api/state_manager.go b/internal/api/state_manager.go new file mode 100644 index 0000000000..008e4d5125 --- /dev/null +++ b/internal/api/state_manager.go @@ -0,0 +1,10 @@ +/* + Copyright (c) 2019 the Octant contributors. All Rights Reserved. + SPDX-License-Identifier: Apache-2.0 +*/ + +package api + +import "github.com/vmware-tanzu/octant/pkg/api" + +type StateManager = api.StateManager diff --git a/internal/api/streaming_service.go b/internal/api/streaming_service.go new file mode 100644 index 0000000000..e4da453332 --- /dev/null +++ b/internal/api/streaming_service.go @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2019 the Octant contributors. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package api + +import ( + "fmt" + "net/http" + + "github.com/vmware-tanzu/octant/internal/config" + "github.com/vmware-tanzu/octant/pkg/api" +) + +func streamService(manager api.ClientManager, dashConfig config.Dash) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + serveStreamingApi(manager, dashConfig, w, r) + } +} + +func serveStreamingApi(manager api.ClientManager, dashConfig config.Dash, w http.ResponseWriter, r *http.Request) { + _, err := manager.ClientFromRequest(dashConfig, w, r) + if err != nil { + if dashConfig != nil { + logger := dashConfig.Logger() + logger.WithErr(err).Errorf("create websocket client") + } + return + } +} + +// Create dummy websocketService and serveWebsocket +func loadingStreamService(manager api.ClientManager) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + serveLoadingStreamingApi(manager, w, r) + } +} + +func serveLoadingStreamingApi(manager api.ClientManager, w http.ResponseWriter, r *http.Request) { + _, err := manager.TemporaryClientFromLoadingRequest(w, r) + if err != nil { + fmt.Println("create loading websocket client") + } +} diff --git a/internal/api/terminal_manager.go b/internal/api/terminal_manager.go index a2fb79d3d9..e35a57c2ae 100644 --- a/internal/api/terminal_manager.go +++ b/internal/api/terminal_manager.go @@ -15,6 +15,7 @@ import ( "github.com/vmware-tanzu/octant/internal/util/kubernetes" + "github.com/vmware-tanzu/octant/pkg/api" "github.com/vmware-tanzu/octant/pkg/event" "github.com/pkg/errors" @@ -36,7 +37,7 @@ const ( ) type terminalStateManager struct { - client OctantClient + client api.OctantClient config config.Dash ctx context.Context instance terminal.Instance @@ -212,7 +213,7 @@ func (s *terminalStateManager) SendTerminalCommand(state octant.State, payload a return s.instance.Write([]byte(key)) } -func (s *terminalStateManager) Start(ctx context.Context, state octant.State, client OctantClient) { +func (s *terminalStateManager) Start(ctx context.Context, state octant.State, client api.OctantClient) { s.client = client s.ctx = ctx } diff --git a/internal/api/terminal_manager_test.go b/internal/api/terminal_manager_test.go index 053c10a593..ecd581587e 100644 --- a/internal/api/terminal_manager_test.go +++ b/internal/api/terminal_manager_test.go @@ -12,9 +12,9 @@ import ( "github.com/golang/mock/gomock" - "github.com/vmware-tanzu/octant/internal/api/fake" configFake "github.com/vmware-tanzu/octant/internal/config/fake" octantFake "github.com/vmware-tanzu/octant/internal/octant/fake" + "github.com/vmware-tanzu/octant/pkg/api/fake" ) func Test_TerminalStateManager(t *testing.T) { diff --git a/internal/api/websocket_client_manager.go b/internal/api/websocket_client_manager.go deleted file mode 100644 index 79ca78a595..0000000000 --- a/internal/api/websocket_client_manager.go +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Copyright (c) 2019 the Octant contributors. All Rights Reserved. - * SPDX-License-Identifier: Apache-2.0 - */ - -package api - -import ( - "context" - "net/http" - - "github.com/vmware-tanzu/octant/pkg/event" - - "github.com/google/uuid" - - "github.com/vmware-tanzu/octant/internal/config" -) - -//go:generate mockgen -destination=./fake/mock_client_manager.go -package=fake github.com/vmware-tanzu/octant/internal/api ClientManager - -// ClientManager is an interface for managing clients. -type ClientManager interface { - Run(ctx context.Context) - Clients() []*WebsocketClient - ClientFromRequest(dashConfig config.Dash, w http.ResponseWriter, r *http.Request) (*WebsocketClient, error) - TemporaryClientFromLoadingRequest(w http.ResponseWriter, r *http.Request) (*WebsocketClient, error) - Get(id string) event.WSEventSender -} - -type clientMeta struct { - cancelFunc context.CancelFunc - client *WebsocketClient -} - -// WebsocketClientManager is a client manager for websockets. -type WebsocketClientManager struct { - // clients is the currently registered clients. - clients map[*WebsocketClient]context.CancelFunc - - // Register registers requests from clients. - register chan *clientMeta - - // unregister unregisters request from clients. - unregister chan *WebsocketClient - - // list populates a client list - requestList chan bool - recvList chan []*WebsocketClient - - ctx context.Context - actionDispatcher ActionDispatcher -} - -var _ ClientManager = (*WebsocketClientManager)(nil) - -// NewWebsocketClientManager creates an instance of WebsocketClientManager. -func NewWebsocketClientManager(ctx context.Context, dispatcher ActionDispatcher) *WebsocketClientManager { - return &WebsocketClientManager{ - ctx: ctx, - clients: make(map[*WebsocketClient]context.CancelFunc), - register: make(chan *clientMeta), - unregister: make(chan *WebsocketClient), - requestList: make(chan bool), - recvList: make(chan []*WebsocketClient), - actionDispatcher: dispatcher, - } -} - -func (m *WebsocketClientManager) Clients() []*WebsocketClient { - m.requestList <- true - clients := <-m.recvList - return clients -} - -// Run runs the manager. It manages multiple websocket clients. -func (m *WebsocketClientManager) Run(ctx context.Context) { - done := false - for !done { - select { - case <-ctx.Done(): - done = true - case meta := <-m.register: - m.clients[meta.client] = meta.cancelFunc - case client := <-m.unregister: - if cancelFunc, ok := m.clients[client]; ok { - cancelFunc() - delete(m.clients, client) - } - case <-m.requestList: - clients := []*WebsocketClient{} - for client := range m.clients { - clients = append(clients, client) - } - m.recvList <- clients - } - } -} - -// ClientFromRequest creates a websocket client from a http request. -func (m *WebsocketClientManager) ClientFromRequest(dashConfig config.Dash, w http.ResponseWriter, r *http.Request) (*WebsocketClient, error) { - clientID, err := uuid.NewUUID() - if err != nil { - return nil, err - } - - conn, err := upgrader.Upgrade(w, r, nil) - if err != nil { - return nil, err - } - - ctx, cancel := context.WithCancel(m.ctx) - client := NewWebsocketClient(ctx, conn, m, dashConfig, m.actionDispatcher, clientID) - m.register <- &clientMeta{ - cancelFunc: func() { - cancel() - m.unregister <- client - }, - client: client, - } - - return client, nil -} - -func (m *WebsocketClientManager) TemporaryClientFromLoadingRequest(w http.ResponseWriter, r *http.Request) (*WebsocketClient, error) { - clientID, err := uuid.NewUUID() - if err != nil { - return nil, err - } - - conn, err := upgrader.Upgrade(w, r, nil) - if err != nil { - return nil, err - } - - ctx, cancel := context.WithCancel(m.ctx) - client := NewTemporaryWebsocketClient(ctx, conn, m, m.actionDispatcher, clientID) - m.register <- &clientMeta{ - cancelFunc: func() { - cancel() - m.unregister <- client - }, - client: client, - } - - return client, nil -} - -func (m *WebsocketClientManager) Get(id string) event.WSEventSender { - for _, client := range m.Clients() { - if id == client.ID() { - return client - } - } - return nil -} diff --git a/internal/api/websocket_service.go b/internal/api/websocket_service.go deleted file mode 100644 index 603745fa27..0000000000 --- a/internal/api/websocket_service.go +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Copyright (c) 2019 the Octant contributors. All Rights Reserved. - * SPDX-License-Identifier: Apache-2.0 - */ - -package api - -import ( - "fmt" - "net" - "net/http" - - "github.com/gorilla/websocket" - - "github.com/vmware-tanzu/octant/internal/config" -) - -var ( - upgrader = websocket.Upgrader{ - ReadBufferSize: 1024, - WriteBufferSize: 1024, - CheckOrigin: func(r *http.Request) bool { - host, _, err := net.SplitHostPort(r.RemoteAddr) - if err != nil { - return false - } - - return shouldAllowHost(host, acceptedHosts()) - }, - } -) - -func websocketService(manager ClientManager, dashConfig config.Dash) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - serveWebsocket(manager, dashConfig, w, r) - } -} - -func serveWebsocket(manager ClientManager, dashConfig config.Dash, w http.ResponseWriter, r *http.Request) { - client, err := manager.ClientFromRequest(dashConfig, w, r) - if err != nil { - if dashConfig != nil { - logger := dashConfig.Logger() - logger.WithErr(err).Errorf("create websocket client") - } - return - } - - go client.readPump() - go client.writePump() -} - -// Create dummy websocketService and serveWebsocket -func loadingWebsocketService(manager ClientManager) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - serveLoadingWebsocket(manager, w, r) - } -} - -func serveLoadingWebsocket(manager ClientManager, w http.ResponseWriter, r *http.Request) { - client, err := manager.TemporaryClientFromLoadingRequest(w, r) - if err != nil { - fmt.Println("create loading websocket client") - } - - go client.readPump() - go client.writePump() -} diff --git a/internal/api/websocket_service_test.go b/internal/api/websocket_service_test.go index b72c78982b..86b6a7fda5 100644 --- a/internal/api/websocket_service_test.go +++ b/internal/api/websocket_service_test.go @@ -6,17 +6,18 @@ import ( "testing" "github.com/vmware-tanzu/octant/internal/config" + "github.com/vmware-tanzu/octant/pkg/api" ) type fakeWebsocketClientManager struct { - WebsocketClientManager + api.StreamingConnectionManager } -func (c *fakeWebsocketClientManager) ClientFromRequest(dashConfig config.Dash, w http.ResponseWriter, r *http.Request) (*WebsocketClient, error) { +func (c *fakeWebsocketClientManager) ClientFromRequest(dashConfig config.Dash, w http.ResponseWriter, r *http.Request) (api.StreamingClient, error) { return nil, fmt.Errorf("test: error") } func TestWebsocketService_serveWebsocket(t *testing.T) { f := &fakeWebsocketClientManager{} - serveWebsocket(f, nil, nil, nil) + serveStreamingApi(f, nil, nil, nil) } diff --git a/internal/api/action.go b/pkg/api/action.go similarity index 84% rename from internal/api/action.go rename to pkg/api/action.go index 154e85cf6e..11881a5276 100644 --- a/internal/api/action.go +++ b/pkg/api/action.go @@ -11,7 +11,7 @@ import ( "github.com/vmware-tanzu/octant/pkg/action" ) -//go:generate mockgen -destination=./fake/mock_action_dispatcher.go -package=fake github.com/vmware-tanzu/octant/internal/api ActionDispatcher +//go:generate mockgen -destination=./fake/mock_action_dispatcher.go -package=fake github.com/vmware-tanzu/octant/pkg/api ActionDispatcher // ActionDispatcher dispatches actions. type ActionDispatcher interface { diff --git a/internal/api/fake/mock_action_dispatcher.go b/pkg/api/fake/mock_action_dispatcher.go similarity index 94% rename from internal/api/fake/mock_action_dispatcher.go rename to pkg/api/fake/mock_action_dispatcher.go index 496128dbf5..46c8966f64 100644 --- a/internal/api/fake/mock_action_dispatcher.go +++ b/pkg/api/fake/mock_action_dispatcher.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: github.com/vmware-tanzu/octant/internal/api (interfaces: ActionDispatcher) +// Source: github.com/vmware-tanzu/octant/pkg/api (interfaces: ActionDispatcher) // Package fake is a generated GoMock package. package fake diff --git a/pkg/api/fake/mock_client_factory.go b/pkg/api/fake/mock_client_factory.go new file mode 100644 index 0000000000..4b70ca13bf --- /dev/null +++ b/pkg/api/fake/mock_client_factory.go @@ -0,0 +1,71 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/vmware-tanzu/octant/pkg/api (interfaces: StreamingClientFactory) + +// Package fake is a generated GoMock package. +package fake + +import ( + context "context" + http "net/http" + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + + config "github.com/vmware-tanzu/octant/internal/config" + api "github.com/vmware-tanzu/octant/pkg/api" +) + +// MockStreamingClientFactory is a mock of StreamingClientFactory interface +type MockStreamingClientFactory struct { + ctrl *gomock.Controller + recorder *MockStreamingClientFactoryMockRecorder +} + +// MockStreamingClientFactoryMockRecorder is the mock recorder for MockStreamingClientFactory +type MockStreamingClientFactoryMockRecorder struct { + mock *MockStreamingClientFactory +} + +// NewMockStreamingClientFactory creates a new mock instance +func NewMockStreamingClientFactory(ctrl *gomock.Controller) *MockStreamingClientFactory { + mock := &MockStreamingClientFactory{ctrl: ctrl} + mock.recorder = &MockStreamingClientFactoryMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockStreamingClientFactory) EXPECT() *MockStreamingClientFactoryMockRecorder { + return m.recorder +} + +// NewConnection mocks base method +func (m *MockStreamingClientFactory) NewConnection(arg0 http.ResponseWriter, arg1 *http.Request, arg2 api.ClientManager, arg3 config.Dash) (api.StreamingClient, context.CancelFunc, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "NewConnection", arg0, arg1, arg2, arg3) + ret0, _ := ret[0].(api.StreamingClient) + ret1, _ := ret[1].(context.CancelFunc) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// NewConnection indicates an expected call of NewConnection +func (mr *MockStreamingClientFactoryMockRecorder) NewConnection(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewConnection", reflect.TypeOf((*MockStreamingClientFactory)(nil).NewConnection), arg0, arg1, arg2, arg3) +} + +// NewTemporaryConnection mocks base method +func (m *MockStreamingClientFactory) NewTemporaryConnection(arg0 http.ResponseWriter, arg1 *http.Request, arg2 api.ClientManager) (api.StreamingClient, context.CancelFunc, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "NewTemporaryConnection", arg0, arg1, arg2) + ret0, _ := ret[0].(api.StreamingClient) + ret1, _ := ret[1].(context.CancelFunc) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// NewTemporaryConnection indicates an expected call of NewTemporaryConnection +func (mr *MockStreamingClientFactoryMockRecorder) NewTemporaryConnection(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewTemporaryConnection", reflect.TypeOf((*MockStreamingClientFactory)(nil).NewTemporaryConnection), arg0, arg1, arg2) +} diff --git a/internal/api/fake/mock_client_manager.go b/pkg/api/fake/mock_client_manager.go similarity index 70% rename from internal/api/fake/mock_client_manager.go rename to pkg/api/fake/mock_client_manager.go index 0a87157321..a1c271b10c 100644 --- a/internal/api/fake/mock_client_manager.go +++ b/pkg/api/fake/mock_client_manager.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: github.com/vmware-tanzu/octant/internal/api (interfaces: ClientManager) +// Source: github.com/vmware-tanzu/octant/pkg/api (interfaces: ClientManager) // Package fake is a generated GoMock package. package fake @@ -11,8 +11,8 @@ import ( gomock "github.com/golang/mock/gomock" - api "github.com/vmware-tanzu/octant/internal/api" config "github.com/vmware-tanzu/octant/internal/config" + api "github.com/vmware-tanzu/octant/pkg/api" event "github.com/vmware-tanzu/octant/pkg/event" ) @@ -39,11 +39,25 @@ func (m *MockClientManager) EXPECT() *MockClientManagerMockRecorder { return m.recorder } +// ActionDispatcher mocks base method +func (m *MockClientManager) ActionDispatcher() api.ActionDispatcher { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ActionDispatcher") + ret0, _ := ret[0].(api.ActionDispatcher) + return ret0 +} + +// ActionDispatcher indicates an expected call of ActionDispatcher +func (mr *MockClientManagerMockRecorder) ActionDispatcher() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ActionDispatcher", reflect.TypeOf((*MockClientManager)(nil).ActionDispatcher)) +} + // ClientFromRequest mocks base method -func (m *MockClientManager) ClientFromRequest(arg0 config.Dash, arg1 http.ResponseWriter, arg2 *http.Request) (*api.WebsocketClient, error) { +func (m *MockClientManager) ClientFromRequest(arg0 config.Dash, arg1 http.ResponseWriter, arg2 *http.Request) (api.StreamingClient, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "ClientFromRequest", arg0, arg1, arg2) - ret0, _ := ret[0].(*api.WebsocketClient) + ret0, _ := ret[0].(api.StreamingClient) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -55,10 +69,10 @@ func (mr *MockClientManagerMockRecorder) ClientFromRequest(arg0, arg1, arg2 inte } // Clients mocks base method -func (m *MockClientManager) Clients() []*api.WebsocketClient { +func (m *MockClientManager) Clients() []api.StreamingClient { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Clients") - ret0, _ := ret[0].([]*api.WebsocketClient) + ret0, _ := ret[0].([]api.StreamingClient) return ret0 } @@ -68,6 +82,20 @@ func (mr *MockClientManagerMockRecorder) Clients() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Clients", reflect.TypeOf((*MockClientManager)(nil).Clients)) } +// Context mocks base method +func (m *MockClientManager) Context() context.Context { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Context") + ret0, _ := ret[0].(context.Context) + return ret0 +} + +// Context indicates an expected call of Context +func (mr *MockClientManagerMockRecorder) Context() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Context", reflect.TypeOf((*MockClientManager)(nil).Context)) +} + // Get mocks base method func (m *MockClientManager) Get(arg0 string) event.WSEventSender { m.ctrl.T.Helper() @@ -95,10 +123,10 @@ func (mr *MockClientManagerMockRecorder) Run(arg0 interface{}) *gomock.Call { } // TemporaryClientFromLoadingRequest mocks base method -func (m *MockClientManager) TemporaryClientFromLoadingRequest(arg0 http.ResponseWriter, arg1 *http.Request) (*api.WebsocketClient, error) { +func (m *MockClientManager) TemporaryClientFromLoadingRequest(arg0 http.ResponseWriter, arg1 *http.Request) (api.StreamingClient, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "TemporaryClientFromLoadingRequest", arg0, arg1) - ret0, _ := ret[0].(*api.WebsocketClient) + ret0, _ := ret[0].(api.StreamingClient) ret1, _ := ret[1].(error) return ret0, ret1 } diff --git a/internal/api/fake/mock_octant_client.go b/pkg/api/fake/mock_octant_client.go similarity index 96% rename from internal/api/fake/mock_octant_client.go rename to pkg/api/fake/mock_octant_client.go index 14ff44fa7a..3fa573078b 100644 --- a/internal/api/fake/mock_octant_client.go +++ b/pkg/api/fake/mock_octant_client.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: github.com/vmware-tanzu/octant/internal/api (interfaces: OctantClient) +// Source: github.com/vmware-tanzu/octant/pkg/api (interfaces: OctantClient) // Package fake is a generated GoMock package. package fake diff --git a/internal/api/fake/mock_state_manager.go b/pkg/api/fake/mock_state_manager.go similarity index 93% rename from internal/api/fake/mock_state_manager.go rename to pkg/api/fake/mock_state_manager.go index 920fe08ca2..9ebe21a39e 100644 --- a/internal/api/fake/mock_state_manager.go +++ b/pkg/api/fake/mock_state_manager.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: github.com/vmware-tanzu/octant/internal/api (interfaces: StateManager) +// Source: github.com/vmware-tanzu/octant/pkg/api (interfaces: StateManager) // Package fake is a generated GoMock package. package fake @@ -10,8 +10,8 @@ import ( gomock "github.com/golang/mock/gomock" - api "github.com/vmware-tanzu/octant/internal/api" octant "github.com/vmware-tanzu/octant/internal/octant" + api "github.com/vmware-tanzu/octant/pkg/api" ) // MockStateManager is a mock of StateManager interface diff --git a/pkg/api/fake/mock_streaming_client.go b/pkg/api/fake/mock_streaming_client.go new file mode 100644 index 0000000000..0595495573 --- /dev/null +++ b/pkg/api/fake/mock_streaming_client.go @@ -0,0 +1,121 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/vmware-tanzu/octant/pkg/api (interfaces: StreamingClient) + +// Package fake is a generated GoMock package. +package fake + +import ( + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + + octant "github.com/vmware-tanzu/octant/internal/octant" + api "github.com/vmware-tanzu/octant/pkg/api" + event "github.com/vmware-tanzu/octant/pkg/event" +) + +// MockStreamingClient is a mock of StreamingClient interface +type MockStreamingClient struct { + ctrl *gomock.Controller + recorder *MockStreamingClientMockRecorder +} + +// MockStreamingClientMockRecorder is the mock recorder for MockStreamingClient +type MockStreamingClientMockRecorder struct { + mock *MockStreamingClient +} + +// NewMockStreamingClient creates a new mock instance +func NewMockStreamingClient(ctrl *gomock.Controller) *MockStreamingClient { + mock := &MockStreamingClient{ctrl: ctrl} + mock.recorder = &MockStreamingClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockStreamingClient) EXPECT() *MockStreamingClientMockRecorder { + return m.recorder +} + +// Handlers mocks base method +func (m *MockStreamingClient) Handlers() map[string][]octant.ClientRequestHandler { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Handlers") + ret0, _ := ret[0].(map[string][]octant.ClientRequestHandler) + return ret0 +} + +// Handlers indicates an expected call of Handlers +func (mr *MockStreamingClientMockRecorder) Handlers() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Handlers", reflect.TypeOf((*MockStreamingClient)(nil).Handlers)) +} + +// ID mocks base method +func (m *MockStreamingClient) ID() string { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ID") + ret0, _ := ret[0].(string) + return ret0 +} + +// ID indicates an expected call of ID +func (mr *MockStreamingClientMockRecorder) ID() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ID", reflect.TypeOf((*MockStreamingClient)(nil).ID)) +} + +// Receive mocks base method +func (m *MockStreamingClient) Receive() (api.StreamRequest, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Receive") + ret0, _ := ret[0].(api.StreamRequest) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Receive indicates an expected call of Receive +func (mr *MockStreamingClientMockRecorder) Receive() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Receive", reflect.TypeOf((*MockStreamingClient)(nil).Receive)) +} + +// Send mocks base method +func (m *MockStreamingClient) Send(arg0 event.Event) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Send", arg0) +} + +// Send indicates an expected call of Send +func (mr *MockStreamingClientMockRecorder) Send(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Send", reflect.TypeOf((*MockStreamingClient)(nil).Send), arg0) +} + +// State mocks base method +func (m *MockStreamingClient) State() octant.State { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "State") + ret0, _ := ret[0].(octant.State) + return ret0 +} + +// State indicates an expected call of State +func (mr *MockStreamingClientMockRecorder) State() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "State", reflect.TypeOf((*MockStreamingClient)(nil).State)) +} + +// StopCh mocks base method +func (m *MockStreamingClient) StopCh() <-chan struct{} { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "StopCh") + ret0, _ := ret[0].(<-chan struct{}) + return ret0 +} + +// StopCh indicates an expected call of StopCh +func (mr *MockStreamingClientMockRecorder) StopCh() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StopCh", reflect.TypeOf((*MockStreamingClient)(nil).StopCh)) +} diff --git a/pkg/api/state_manager.go b/pkg/api/state_manager.go new file mode 100644 index 0000000000..0c6f2ed0dd --- /dev/null +++ b/pkg/api/state_manager.go @@ -0,0 +1,19 @@ +/* + Copyright (c) 2019 the Octant contributors. All Rights Reserved. + SPDX-License-Identifier: Apache-2.0 +*/ + +package api + +import ( + "context" + + "github.com/vmware-tanzu/octant/internal/octant" +) + +//go:generate mockgen -destination=./fake/mock_state_manager.go -package=fake github.com/vmware-tanzu/octant/pkg/api StateManager +// StateManager manages states for WebsocketState. +type StateManager interface { + Handlers() []octant.ClientRequestHandler + Start(ctx context.Context, state octant.State, s OctantClient) +} diff --git a/pkg/api/streaming.go b/pkg/api/streaming.go new file mode 100644 index 0000000000..a7ccdcce74 --- /dev/null +++ b/pkg/api/streaming.go @@ -0,0 +1,60 @@ +/* + Copyright (c) 2019 the Octant contributors. All Rights Reserved. + SPDX-License-Identifier: Apache-2.0 +*/ +package api + +import ( + "context" + "net/http" + + "github.com/vmware-tanzu/octant/internal/config" + "github.com/vmware-tanzu/octant/internal/octant" + "github.com/vmware-tanzu/octant/pkg/action" + "github.com/vmware-tanzu/octant/pkg/event" +) + +//go:generate mockgen -destination=./fake/mock_client_manager.go -package=fake github.com/vmware-tanzu/octant/pkg/api ClientManager +//go:generate mockgen -destination=./fake/mock_client_factory.go -package=fake github.com/vmware-tanzu/octant/pkg/api StreamingClientFactory +//go:generate mockgen -destination=./fake/mock_streaming_client.go -package=fake github.com/vmware-tanzu/octant/pkg/api StreamingClient +//go:generate mockgen -destination=./fake/mock_octant_client.go -package=fake github.com/vmware-tanzu/octant/pkg/api OctantClient + +// ClientManager is an interface for managing clients. +type ClientManager interface { + Run(ctx context.Context) + Clients() []StreamingClient + ClientFromRequest(dashConfig config.Dash, w http.ResponseWriter, r *http.Request) (StreamingClient, error) + TemporaryClientFromLoadingRequest(w http.ResponseWriter, r *http.Request) (StreamingClient, error) + Get(id string) event.WSEventSender + Context() context.Context + ActionDispatcher() ActionDispatcher +} + +type StreamRequest struct { + Type string `json:"type"` + Payload action.Payload `json:"payload"` +} + +// OctantClient is the interface responsible for sending streaming data to a +// users session, usually in a browser. +type OctantClient interface { + Send(event.Event) + ID() string + StopCh() <-chan struct{} +} + +// StreamingClient is the interface responsible for sending and receiving +// streaming data to a users session, usually in a browser. +type StreamingClient interface { + OctantClient + + Receive() (StreamRequest, error) + + Handlers() map[string][]octant.ClientRequestHandler + State() octant.State +} + +type StreamingClientFactory interface { + NewConnection(http.ResponseWriter, *http.Request, ClientManager, config.Dash) (StreamingClient, context.CancelFunc, error) + NewTemporaryConnection(http.ResponseWriter, *http.Request, ClientManager) (StreamingClient, context.CancelFunc, error) +} diff --git a/pkg/api/streaming_connection_manager.go b/pkg/api/streaming_connection_manager.go new file mode 100644 index 0000000000..e05b922a78 --- /dev/null +++ b/pkg/api/streaming_connection_manager.go @@ -0,0 +1,135 @@ +/* + * Copyright (c) 2019 the Octant contributors. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ +package api + +import ( + "context" + "net/http" + + "github.com/vmware-tanzu/octant/internal/config" + "github.com/vmware-tanzu/octant/pkg/event" +) + +var _ ClientManager = (*StreamingConnectionManager)(nil) + +// StreamingConnectionManager is a client manager for streams. +type StreamingConnectionManager struct { + clientFactory StreamingClientFactory + + // clients is the currently registered clients. + clients map[StreamingClient]context.CancelFunc + + // Register registers requests from clients. + register chan *clientMeta + + // unregister unregisters request from clients. + unregister chan StreamingClient + + // list populates a client list + requestList chan bool + recvList chan []StreamingClient + + ctx context.Context + actionDispatcher ActionDispatcher +} + +type clientMeta struct { + cancelFunc context.CancelFunc + client StreamingClient +} + +// NewStreamingConnectionManager creates an instance of WebsocketClientManager. +func NewStreamingConnectionManager(ctx context.Context, dispatcher ActionDispatcher, clientFactory StreamingClientFactory) *StreamingConnectionManager { + return &StreamingConnectionManager{ + ctx: ctx, + clients: make(map[StreamingClient]context.CancelFunc), + register: make(chan *clientMeta), + unregister: make(chan StreamingClient), + requestList: make(chan bool), + recvList: make(chan []StreamingClient), + actionDispatcher: dispatcher, + clientFactory: clientFactory, + } +} + +func (m *StreamingConnectionManager) Clients() []StreamingClient { + m.requestList <- true + clients := <-m.recvList + return clients +} + +// Run runs the manager. It manages multiple websocket clients. +func (m *StreamingConnectionManager) Run(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case meta := <-m.register: + m.clients[meta.client] = meta.cancelFunc + case client := <-m.unregister: + if cancelFunc, ok := m.clients[client]; ok { + cancelFunc() + delete(m.clients, client) + } + case <-m.requestList: + clients := []StreamingClient{} + for client := range m.clients { + clients = append(clients, client) + } + m.recvList <- clients + } + } +} + +// ClientFromRequest creates a websocket client from a http request. +func (m *StreamingConnectionManager) ClientFromRequest(dashConfig config.Dash, w http.ResponseWriter, r *http.Request) (StreamingClient, error) { + + client, cancel, err := m.clientFactory.NewConnection(w, r, m, dashConfig) + if err != nil { + return nil, err + } + m.register <- &clientMeta{ + cancelFunc: func() { + cancel() + m.unregister <- client + }, + client: client, + } + + return client, nil +} + +func (m *StreamingConnectionManager) TemporaryClientFromLoadingRequest(w http.ResponseWriter, r *http.Request) (StreamingClient, error) { + client, cancel, err := m.clientFactory.NewTemporaryConnection(w, r, m) + if err != nil { + return nil, err + } + m.register <- &clientMeta{ + cancelFunc: func() { + cancel() + m.unregister <- client + }, + client: client, + } + + return client, nil +} + +func (m *StreamingConnectionManager) Get(id string) event.WSEventSender { + for _, client := range m.Clients() { + if id == client.ID() { + return client + } + } + return nil +} + +func (m *StreamingConnectionManager) Context() context.Context { + return m.ctx +} + +func (m *StreamingConnectionManager) ActionDispatcher() ActionDispatcher { + return m.actionDispatcher +} diff --git a/internal/api/websocket_client.go b/pkg/api/websockets/websocket_client.go similarity index 78% rename from internal/api/websocket_client.go rename to pkg/api/websockets/websocket_client.go index c288b3976a..71fb50b2b6 100644 --- a/internal/api/websocket_client.go +++ b/pkg/api/websockets/websocket_client.go @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package api +package websockets import ( "context" @@ -12,6 +12,9 @@ import ( "time" "github.com/vmware-tanzu/octant/internal/util/json" + "github.com/vmware-tanzu/octant/pkg/api" + "github.com/vmware-tanzu/octant/pkg/errors" + "github.com/vmware-tanzu/octant/pkg/log" "github.com/vmware-tanzu/octant/pkg/event" @@ -23,7 +26,6 @@ import ( internalLog "github.com/vmware-tanzu/octant/internal/log" "github.com/vmware-tanzu/octant/internal/octant" "github.com/vmware-tanzu/octant/pkg/action" - "github.com/vmware-tanzu/octant/pkg/log" ) const ( @@ -49,7 +51,7 @@ type WebsocketClient struct { logger log.Logger ctx context.Context cancel context.CancelFunc - manager *WebsocketClientManager + manager api.ClientManager isOpen atomic.Value state octant.State @@ -58,10 +60,8 @@ type WebsocketClient struct { stopCh chan struct{} } -var _ OctantClient = (*WebsocketClient)(nil) - // NewWebsocketClient creates an instance of WebsocketClient. -func NewWebsocketClient(ctx context.Context, conn *websocket.Conn, manager *WebsocketClientManager, dashConfig config.Dash, actionDispatcher ActionDispatcher, id uuid.UUID) *WebsocketClient { +func NewWebsocketClient(ctx context.Context, conn *websocket.Conn, manager api.ClientManager, dashConfig config.Dash, actionDispatcher api.ActionDispatcher, id uuid.UUID) *WebsocketClient { logger := dashConfig.Logger().With("component", "websocket-client", "client-id", id.String()) ctx = internalLog.WithLoggerContext(ctx, logger) @@ -98,7 +98,7 @@ func NewWebsocketClient(ctx context.Context, conn *websocket.Conn, manager *Webs } // NewTemporaryWebsocketClient creates an instance of WebsocketClient -func NewTemporaryWebsocketClient(ctx context.Context, conn *websocket.Conn, manager *WebsocketClientManager, actionDispatcher ActionDispatcher, id uuid.UUID) *WebsocketClient { +func NewTemporaryWebsocketClient(ctx context.Context, conn *websocket.Conn, manager api.ClientManager, actionDispatcher api.ActionDispatcher, id uuid.UUID) *WebsocketClient { ctx, cancel := context.WithCancel(ctx) logger := internalLog.From(ctx) @@ -130,6 +130,10 @@ func (c *WebsocketClient) ID() string { return c.id.String() } +func (c *WebsocketClient) Handlers() map[string][]octant.ClientRequestHandler { + return c.handlers +} + func (c *WebsocketClient) readPump() { defer func() { c.isOpen.Store(false) @@ -156,16 +160,17 @@ func (c *WebsocketClient) readPump() { }) for { - _, message, err := c.conn.ReadMessage() + request, err := c.Receive() if err != nil { - if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure, websocket.CloseNoStatusReceived) { - c.logger.WithErr(err).Errorf("Unhandled websocket error") + if errors.IsFatalStreamError(err) { + c.cancel() + break } - c.cancel() - break + + continue } - if err := c.handle(message); err != nil { + if err := handleStreamingMessage(c, request); err != nil { c.logger.WithErr(err).Errorf("Handle websocket message") } } @@ -173,27 +178,22 @@ func (c *WebsocketClient) readPump() { close(c.stopCh) } -func (c *WebsocketClient) handle(message []byte) error { - var request websocketRequest - if err := json.Unmarshal(message, &request); err != nil { - return err - } - - handlers, ok := c.handlers[request.Type] +func handleStreamingMessage(client api.StreamingClient, request api.StreamRequest) error { + handlers, ok := client.Handlers()[request.Type] if !ok { - return c.handleUnknownRequest(request) + return handleUnknownRequest(client, request) } var g errgroup.Group for _, handler := range handlers { g.Go(func() error { - return handler.Handler(c.state, request.Payload) + return handler.Handler(client.State(), request.Payload) }) } if err := g.Wait(); err != nil { - c.Send(event.CreateEvent("handlerError", action.Payload{ + client.Send(event.CreateEvent("handlerError", action.Payload{ "requestType": request.Type, "error": err.Error(), })) @@ -203,7 +203,7 @@ func (c *WebsocketClient) handle(message []byte) error { return nil } -func (c *WebsocketClient) handleUnknownRequest(request websocketRequest) error { +func handleUnknownRequest(client api.OctantClient, request api.StreamRequest) error { message := "unknown request" if request.Type != "" { message = fmt.Sprintf("unknown request %s", request.Type) @@ -213,7 +213,7 @@ func (c *WebsocketClient) handleUnknownRequest(request websocketRequest) error { "message": message, "payload": request.Payload, } - c.Send(event.CreateEvent(event.EventTypeUnknown, m)) + client.Send(event.CreateEvent(event.EventTypeUnknown, m)) return nil } @@ -279,6 +279,30 @@ func (c *WebsocketClient) Send(ev event.Event) { } } +func (c *WebsocketClient) Receive() (api.StreamRequest, error) { + _, message, err := c.conn.ReadMessage() + if err != nil { + if websocket.IsUnexpectedCloseError( + err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure, websocket.CloseNoStatusReceived, + ) { + c.logger.WithErr(err).Errorf("Unhandled websocket error") + } + return api.StreamRequest{}, errors.FatalStreamError(err) + } + + var request api.StreamRequest + if err := json.Unmarshal(message, &request); err != nil { + c.logger.WithErr(err).Errorf("Unmarshaling websocket message") + return api.StreamRequest{}, err + } + + return request, nil +} + +func (c *WebsocketClient) State() octant.State { + return c.state +} + // StopCh returns the client's stop channel. It will be closed when the WebsocketClient is closed. func (c *WebsocketClient) StopCh() <-chan struct{} { return c.stopCh @@ -287,8 +311,3 @@ func (c *WebsocketClient) StopCh() <-chan struct{} { func (c *WebsocketClient) RegisterHandler(handler octant.ClientRequestHandler) { c.handlers[handler.RequestType] = append(c.handlers[handler.RequestType], handler) } - -type websocketRequest struct { - Type string `json:"type"` - Payload action.Payload `json:"payload"` -} diff --git a/internal/api/websocket_client_test.go b/pkg/api/websockets/websocket_client_test.go similarity index 98% rename from internal/api/websocket_client_test.go rename to pkg/api/websockets/websocket_client_test.go index 046de4306e..ce50f7d04b 100644 --- a/internal/api/websocket_client_test.go +++ b/pkg/api/websockets/websocket_client_test.go @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package api +package websockets import ( "testing" diff --git a/pkg/api/websockets/websocket_connection_factory.go b/pkg/api/websockets/websocket_connection_factory.go new file mode 100644 index 0000000000..74c1cd58a4 --- /dev/null +++ b/pkg/api/websockets/websocket_connection_factory.go @@ -0,0 +1,81 @@ +package websockets + +import ( + "context" + "net" + "net/http" + + "github.com/google/uuid" + "github.com/gorilla/websocket" + + internalAPI "github.com/vmware-tanzu/octant/internal/api" + "github.com/vmware-tanzu/octant/internal/config" + "github.com/vmware-tanzu/octant/pkg/api" +) + +type WebsocketConnectionFactory struct { + upgrader websocket.Upgrader +} + +func NewWebsocketConnectionFactory() *WebsocketConnectionFactory { + return &WebsocketConnectionFactory{ + upgrader: websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + CheckOrigin: func(r *http.Request) bool { + host, _, err := net.SplitHostPort(r.RemoteAddr) + if err != nil { + return false + } + + return internalAPI.ShouldAllowHost(host, internalAPI.AcceptedHosts()) + }, + }, + } +} + +var _ api.StreamingClientFactory = (*WebsocketConnectionFactory)(nil) + +func (wcf *WebsocketConnectionFactory) NewConnection( + w http.ResponseWriter, r *http.Request, m api.ClientManager, dashConfig config.Dash, +) (api.StreamingClient, context.CancelFunc, error) { + clientID, err := uuid.NewUUID() + if err != nil { + return nil, nil, err + } + + conn, err := wcf.upgrader.Upgrade(w, r, nil) + if err != nil { + return nil, nil, err + } + + ctx, cancel := context.WithCancel(m.Context()) + client := NewWebsocketClient(ctx, conn, m, dashConfig, m.ActionDispatcher(), clientID) + + go client.readPump() + go client.writePump() + + return client, cancel, nil +} + +func (wcf *WebsocketConnectionFactory) NewTemporaryConnection( + w http.ResponseWriter, r *http.Request, m api.ClientManager, +) (api.StreamingClient, context.CancelFunc, error) { + clientID, err := uuid.NewUUID() + if err != nil { + return nil, nil, err + } + + conn, err := wcf.upgrader.Upgrade(w, r, nil) + if err != nil { + return nil, nil, err + } + + ctx, cancel := context.WithCancel(m.Context()) + client := NewTemporaryWebsocketClient(ctx, conn, m, m.ActionDispatcher(), clientID) + + go client.readPump() + go client.writePump() + + return client, cancel, nil +} diff --git a/internal/api/websocket_state.go b/pkg/api/websockets/websocket_state.go similarity index 86% rename from internal/api/websocket_state.go rename to pkg/api/websockets/websocket_state.go index f0c91ea3f7..5fd68a9196 100644 --- a/internal/api/websocket_state.go +++ b/pkg/api/websockets/websocket_state.go @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package api +package websockets import ( "context" @@ -14,8 +14,10 @@ import ( "strings" "sync" + internalAPI "github.com/vmware-tanzu/octant/internal/api" "github.com/vmware-tanzu/octant/internal/util/path_util" + "github.com/vmware-tanzu/octant/pkg/api" "github.com/vmware-tanzu/octant/pkg/event" "github.com/google/uuid" @@ -25,42 +27,26 @@ import ( "github.com/vmware-tanzu/octant/pkg/action" ) -//go:generate mockgen -destination=./fake/mock_state_manager.go -package=fake github.com/vmware-tanzu/octant/internal/api StateManager -//go:generate mockgen -destination=./fake/mock_octant_client.go -package=fake github.com/vmware-tanzu/octant/internal/api OctantClient - var ( reContentPathNamespace = regexp.MustCompile(`^/namespace/(?P[^/]+)/?`) ) -// StateManager manages states for WebsocketState. -type StateManager interface { - Handlers() []octant.ClientRequestHandler - Start(ctx context.Context, state octant.State, s OctantClient) -} - -func defaultStateManagers(clientID string, dashConfig config.Dash) []StateManager { +func defaultStateManagers(clientID string, dashConfig config.Dash) []api.StateManager { logger := dashConfig.Logger().With("client-id", clientID) - return []StateManager{ - NewContentManager(dashConfig.ModuleManager(), dashConfig, logger), - NewHelperStateManager(dashConfig), - NewFilterManager(), - NewNavigationManager(dashConfig), - NewNamespacesManager(dashConfig), - NewContextManager(dashConfig), - NewActionRequestManager(dashConfig), - NewTerminalStateManager(dashConfig), - NewPodLogsStateManager(dashConfig), + return []api.StateManager{ + internalAPI.NewContentManager(dashConfig.ModuleManager(), dashConfig, logger), + internalAPI.NewHelperStateManager(dashConfig), + internalAPI.NewFilterManager(), + internalAPI.NewNavigationManager(dashConfig), + internalAPI.NewNamespacesManager(dashConfig), + internalAPI.NewContextManager(dashConfig), + internalAPI.NewActionRequestManager(dashConfig), + internalAPI.NewTerminalStateManager(dashConfig), + internalAPI.NewPodLogsStateManager(dashConfig), } } -// OctantClient is an OctantClient. -type OctantClient interface { - Send(event event.Event) - ID() string - StopCh() <-chan struct{} -} - type atomicString struct { mu sync.RWMutex s string @@ -91,7 +77,7 @@ func (s *atomicString) set(v string) { type WebsocketStateOption func(w *WebsocketState) // WebsocketStateManagers configures WebsocketState's state managers. -func WebsocketStateManagers(managers []StateManager) WebsocketStateOption { +func WebsocketStateManagers(managers []api.StateManager) WebsocketStateOption { return func(w *WebsocketState) { w.managers = managers } @@ -100,7 +86,7 @@ func WebsocketStateManagers(managers []StateManager) WebsocketStateOption { // WebsocketState manages state for a websocket client. type WebsocketState struct { dashConfig config.Dash - wsClient OctantClient + wsClient api.OctantClient contentPath *atomicString namespace *atomicString filters []octant.Filter @@ -108,8 +94,8 @@ type WebsocketState struct { namespaceUpdates map[string]octant.NamespaceUpdateFunc mu sync.RWMutex - managers []StateManager - actionDispatcher ActionDispatcher + managers []api.StateManager + actionDispatcher api.ActionDispatcher startCtx context.Context managersCancelFunc context.CancelFunc @@ -118,7 +104,7 @@ type WebsocketState struct { var _ octant.State = (*WebsocketState)(nil) // NewWebsocketState creates an instance of WebsocketState. -func NewWebsocketState(dashConfig config.Dash, actionDispatcher ActionDispatcher, wsClient OctantClient, options ...WebsocketStateOption) *WebsocketState { +func NewWebsocketState(dashConfig config.Dash, actionDispatcher api.ActionDispatcher, wsClient api.OctantClient, options ...WebsocketStateOption) *WebsocketState { defaultNamespace := dashConfig.DefaultNamespace() w := &WebsocketState{ @@ -143,7 +129,7 @@ func NewWebsocketState(dashConfig config.Dash, actionDispatcher ActionDispatcher return w } -func NewTemporaryWebsocketState(actionDispatcher ActionDispatcher, wsClient OctantClient, options ...WebsocketStateOption) *WebsocketState { +func NewTemporaryWebsocketState(actionDispatcher api.ActionDispatcher, wsClient api.OctantClient, options ...WebsocketStateOption) *WebsocketState { w := &WebsocketState{ wsClient: wsClient, contentPathUpdates: make(map[string]octant.ContentPathUpdateFunc), @@ -156,8 +142,8 @@ func NewTemporaryWebsocketState(actionDispatcher ActionDispatcher, wsClient Octa } if len(w.managers) < 1 { - w.managers = []StateManager{ - NewLoadingManager(), + w.managers = []api.StateManager{ + internalAPI.NewLoadingManager(), } } diff --git a/internal/api/websocket_state_test.go b/pkg/api/websockets/websocket_state_test.go similarity index 88% rename from internal/api/websocket_state_test.go rename to pkg/api/websockets/websocket_state_test.go index 739f4a95bf..83189746e7 100644 --- a/internal/api/websocket_state_test.go +++ b/pkg/api/websockets/websocket_state_test.go @@ -3,11 +3,10 @@ * SPDX-License-Identifier: Apache-2.0 */ -package api_test +package websockets_test import ( "context" - "sort" "testing" "time" @@ -15,12 +14,14 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/vmware-tanzu/octant/internal/api" - "github.com/vmware-tanzu/octant/internal/api/fake" + internalAPI "github.com/vmware-tanzu/octant/internal/api" configFake "github.com/vmware-tanzu/octant/internal/config/fake" "github.com/vmware-tanzu/octant/internal/log" moduleFake "github.com/vmware-tanzu/octant/internal/module/fake" "github.com/vmware-tanzu/octant/internal/octant" + "github.com/vmware-tanzu/octant/pkg/api" + "github.com/vmware-tanzu/octant/pkg/api/fake" + "github.com/vmware-tanzu/octant/pkg/api/websockets" ) func TestWebsocketState_Start(t *testing.T) { @@ -48,7 +49,7 @@ func TestWebsocketState_SetContentPath(t *testing.T) { contentPath string namespace string setup func(mocks *websocketStateMocks) - verify func(t *testing.T, s *api.WebsocketState) + verify func(t *testing.T, s *websockets.WebsocketState) }{ { name: "set content path without namespace change", @@ -60,7 +61,7 @@ func TestWebsocketState_SetContentPath(t *testing.T) { ModuleForContentPath(contentPath). Return(mocks.module, true) }, - verify: func(t *testing.T, s *api.WebsocketState) { + verify: func(t *testing.T, s *websockets.WebsocketState) { contentPath := "overview/namespace/default" assert.Equal(t, "default", s.GetNamespace()) assert.Equal(t, contentPath, s.GetContentPath()) @@ -76,7 +77,7 @@ func TestWebsocketState_SetContentPath(t *testing.T) { ModuleForContentPath(contentPath). Return(mocks.module, true) }, - verify: func(t *testing.T, s *api.WebsocketState) { + verify: func(t *testing.T, s *websockets.WebsocketState) { contentPath := "overview/foo" assert.Equal(t, "default", s.GetNamespace()) assert.Equal(t, contentPath, s.GetContentPath()) @@ -92,7 +93,7 @@ func TestWebsocketState_SetContentPath(t *testing.T) { ModuleForContentPath(contentPath). Return(mocks.module, true) }, - verify: func(t *testing.T, s *api.WebsocketState) { + verify: func(t *testing.T, s *websockets.WebsocketState) { contentPath := "overview/namespace/kube-system" assert.Equal(t, "kube-system", s.GetNamespace()) assert.Equal(t, contentPath, s.GetContentPath()) @@ -300,9 +301,9 @@ func newWebsocketStateMocks(t *testing.T, namespace string) *websocketStateMocks } } -func (w *websocketStateMocks) options() []api.WebsocketStateOption { - return []api.WebsocketStateOption{ - api.WebsocketStateManagers([]api.StateManager{w.stateManager}), +func (w *websocketStateMocks) options() []websockets.WebsocketStateOption { + return []websockets.WebsocketStateOption{ + websockets.WebsocketStateManagers([]internalAPI.StateManager{w.stateManager}), } } @@ -310,19 +311,7 @@ func (w *websocketStateMocks) finish() { w.controller.Finish() } -func (w *websocketStateMocks) factory() *api.WebsocketState { - return api.NewWebsocketState(w.dashConfig, w.actionDispatcher, w.wsClient, w.options()...) +func (w *websocketStateMocks) factory() *websockets.WebsocketState { + return websockets.NewWebsocketState(w.dashConfig, w.actionDispatcher, w.wsClient, w.options()...) } - -func AssertHandlers(t *testing.T, manager api.StateManager, expected []string) { - handlers := manager.Handlers() - var got []string - for _, h := range handlers { - got = append(got, h.RequestType) - } - sort.Strings(got) - sort.Strings(expected) - - assert.Equal(t, expected, got) -} diff --git a/pkg/dash/dash.go b/pkg/dash/dash.go index fd898d49fc..1105e8ebd9 100644 --- a/pkg/dash/dash.go +++ b/pkg/dash/dash.go @@ -28,7 +28,7 @@ import ( "github.com/spf13/viper" "go.opencensus.io/trace" - "github.com/vmware-tanzu/octant/internal/api" + internalAPI "github.com/vmware-tanzu/octant/internal/api" "github.com/vmware-tanzu/octant/internal/cluster" "github.com/vmware-tanzu/octant/internal/config" ocontext "github.com/vmware-tanzu/octant/internal/context" @@ -46,6 +46,8 @@ import ( "github.com/vmware-tanzu/octant/internal/objectstore" "github.com/vmware-tanzu/octant/internal/portforward" "github.com/vmware-tanzu/octant/pkg/action" + "github.com/vmware-tanzu/octant/pkg/api" + "github.com/vmware-tanzu/octant/pkg/api/websockets" "github.com/vmware-tanzu/octant/pkg/log" "github.com/vmware-tanzu/octant/pkg/octant" "github.com/vmware-tanzu/octant/pkg/plugin" @@ -71,6 +73,7 @@ type Options struct { Listener net.Listener clusterClient cluster.ClientInterface factory dynamicinformer.DynamicSharedInformerFactory + streamingClientFactory api.StreamingClientFactory } type RunnerOption struct { @@ -215,15 +218,23 @@ func WithClusterClient(client cluster.ClientInterface) RunnerOption { } } +func WithStreamingClientFactory(factory api.StreamingClientFactory) RunnerOption { + return RunnerOption{ + nonClusterOption: func(o *Options) { + o.streamingClientFactory = factory + }, + } +} + type Runner struct { - ctx context.Context - dash *dash - pluginManager *plugin.Manager - moduleManager *module.Manager - actionManager *action.Manager - websocketClientManager *api.WebsocketClientManager - apiCreated bool - fs afero.Fs + ctx context.Context + dash *dash + pluginManager *plugin.Manager + moduleManager *module.Manager + actionManager *action.Manager + streamingConnectionManager *api.StreamingConnectionManager + apiCreated bool + fs afero.Fs } func NewRunner(ctx context.Context, logger log.Logger, opts ...RunnerOption) (*Runner, error) { @@ -244,14 +255,19 @@ func NewRunner(ctx context.Context, logger log.Logger, opts ...RunnerOption) (*R actionManger := action.NewManager(logger) r.actionManager = actionManger - websocketClientManager := api.NewWebsocketClientManager(ctx, r.actionManager) - r.websocketClientManager = websocketClientManager - go websocketClientManager.Run(ctx) + var streamingConnectionManager *api.StreamingConnectionManager + if options.streamingClientFactory != nil { + streamingConnectionManager = api.NewStreamingConnectionManager(ctx, r.actionManager, options.streamingClientFactory) + } else { + streamingConnectionManager = api.NewStreamingConnectionManager(ctx, r.actionManager, websockets.NewWebsocketConnectionFactory()) + } + r.streamingConnectionManager = streamingConnectionManager + go streamingConnectionManager.Run(ctx) var err error var pluginService *pluginAPI.GRPCService - var apiService api.Service + var apiService internalAPI.Service var apiErr error r.fs = afero.NewOsFs() @@ -279,7 +295,7 @@ func NewRunner(ctx context.Context, logger log.Logger, opts ...RunnerOption) (*R return &r, nil } -func (r *Runner) apiFromKubeConfig(kubeConfig string, opts ...RunnerOption) (api.Service, *pluginAPI.GRPCService, error) { +func (r *Runner) apiFromKubeConfig(kubeConfig string, opts ...RunnerOption) (internalAPI.Service, *pluginAPI.GRPCService, error) { logger := internalLog.From(r.ctx) validKubeConfig, err := ValidateKubeConfig(logger, kubeConfig, r.fs) if err == nil { @@ -287,7 +303,7 @@ func (r *Runner) apiFromKubeConfig(kubeConfig string, opts ...RunnerOption) (api return r.initAPI(r.ctx, logger, opts...) } else { logger.Infof("no valid kube config found, initializing loading API") - return api.NewLoadingAPI(r.ctx, api.PathPrefix, r.actionManager, r.websocketClientManager, logger), nil, nil + return internalAPI.NewLoadingAPI(r.ctx, internalAPI.PathPrefix, r.actionManager, r.streamingConnectionManager, logger), nil, nil } } @@ -347,7 +363,7 @@ func (r *Runner) Start(startupCh, shutdownCh chan bool, opts ...RunnerOption) er return nil } -func (r *Runner) initAPI(ctx context.Context, logger log.Logger, opts ...RunnerOption) (*api.API, *pluginAPI.GRPCService, error) { +func (r *Runner) initAPI(ctx context.Context, logger log.Logger, opts ...RunnerOption) (*internalAPI.API, *pluginAPI.GRPCService, error) { kubeConfigOptions := []kubeconfig.KubeConfigOption{} options := Options{} for _, opt := range opts { @@ -439,10 +455,10 @@ func (r *Runner) initAPI(ctx context.Context, logger log.Logger, opts ...RunnerO PortForwarder: portForwarder, NamespaceInterface: nsClient, FrontendProxy: frontendProxy, - WebsocketClientManager: r.websocketClientManager, + WebsocketClientManager: r.streamingConnectionManager, } - pluginManager, err := initPlugin(moduleManager, r.actionManager, r.websocketClientManager, pluginDashboardService) + pluginManager, err := initPlugin(moduleManager, r.actionManager, r.streamingConnectionManager, pluginDashboardService) if err != nil { return nil, nil, fmt.Errorf("initializing plugin manager: %w", err) } @@ -501,7 +517,7 @@ func (r *Runner) initAPI(ctx context.Context, logger log.Logger, opts ...RunnerO return nil, nil, fmt.Errorf("unable to start CRD watcher: %w", err) } - apiService := api.New(ctx, api.PathPrefix, r.actionManager, r.websocketClientManager, dashConfig) + apiService := internalAPI.New(ctx, internalAPI.PathPrefix, r.actionManager, r.streamingConnectionManager, dashConfig) frontendProxy.FrontendUpdateController = apiService r.apiCreated = true @@ -615,7 +631,7 @@ type dash struct { browserPath string namespace string defaultHandler func() (http.Handler, error) - apiHandler api.Service + apiHandler internalAPI.Service willOpenBrowser bool logger log.Logger handlerFactory *octant.HandlerFactory @@ -623,7 +639,7 @@ type dash struct { pluginService pluginAPI.Service } -func newDash(listener net.Listener, namespace, uiURL string, browserPath string, apiHandler api.Service, pluginHandler pluginAPI.Service, logger log.Logger) (*dash, error) { +func newDash(listener net.Listener, namespace, uiURL string, browserPath string, apiHandler internalAPI.Service, pluginHandler pluginAPI.Service, logger log.Logger) (*dash, error) { hf := octant.NewHandlerFactory( octant.BackendHandler(apiHandler.Handler), octant.FrontendURL(viper.GetString("proxy-frontend"))) @@ -643,7 +659,7 @@ func newDash(listener net.Listener, namespace, uiURL string, browserPath string, }, nil } -func (d *dash) SetAPIService(ctx context.Context, apiService api.Service) error { +func (d *dash) SetAPIService(ctx context.Context, apiService internalAPI.Service) error { d.apiHandler = apiService hf := octant.NewHandlerFactory( octant.BackendHandler(d.apiHandler.Handler), diff --git a/pkg/errors/streaming_connection_error.go b/pkg/errors/streaming_connection_error.go new file mode 100644 index 0000000000..0a87c699fc --- /dev/null +++ b/pkg/errors/streaming_connection_error.go @@ -0,0 +1,39 @@ +package errors + +import "github.com/vmware-tanzu/octant/internal/errors" + +type StreamError struct { + *errors.GenericError + Fatal bool +} + +func NewStreamError(err error) *StreamError { + return &StreamError{ + errors.NewGenericError(err), + false, + } +} + +func FatalStreamError(err error) *StreamError { + return &StreamError{ + errors.NewGenericError(err), + true, + } +} + +func IsFatalStreamError(err error) bool { + switch sErr := err.(type) { + case StreamError: + return sErr.Fatal + case *StreamError: + return sErr.Fatal + default: + return false + } +} + +const StreamingConnectionError = "StreamingConnectionError" + +func (s *StreamError) Name() string { + return StreamingConnectionError +} diff --git a/pkg/errors/streaming_connection_error_test.go b/pkg/errors/streaming_connection_error_test.go new file mode 100644 index 0000000000..24c8293125 --- /dev/null +++ b/pkg/errors/streaming_connection_error_test.go @@ -0,0 +1,65 @@ +package errors + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestConstructors(t *testing.T) { + err := fmt.Errorf("encountered an error while streaming") + table := []struct { + name string + sErr *StreamError + fatal bool + }{ + {"NewStreamError", NewStreamError(err), false}, + {"FatalStreamError", FatalStreamError(err), true}, + } + + for _, test := range table { + t.Run(test.name, func(t *testing.T) { + assert.NotEmpty(t, test.sErr.Timestamp()) + assert.Equal(t, test.sErr.Name(), StreamingConnectionError) + assert.NotZero(t, test.sErr.ID()) + assert.Equal(t, test.fatal, test.sErr.Fatal) + assert.Equal(t, err.Error(), test.sErr.Error()) + }) + } +} + +func TestIsFatalStreamError(t *testing.T) { + table := []struct { + name string + sErr error + expected bool + }{ + { + "Expect IsFatalStreamError to be false for a standard stream error", + NewStreamError(fmt.Errorf("")), + false, + }, + { + "Expect IsFatalStreamError to be true for fatal stream error", + FatalStreamError(fmt.Errorf("")), + true, + }, + { + "A non pointer fatal stream error is provided", + *FatalStreamError(fmt.Errorf("")), + true, + }, + { + "A generic error is provided", + fmt.Errorf(""), + false, + }, + } + + for _, test := range table { + t.Run(test.name, func(t *testing.T) { + assert.Equal(t, test.expected, IsFatalStreamError(test.sErr)) + }) + } +}