diff --git a/internal/api/streaming_connection_manager.go b/internal/api/streaming_connection_manager.go index a50bd85bcd..8b03151762 100644 --- a/internal/api/streaming_connection_manager.go +++ b/internal/api/streaming_connection_manager.go @@ -96,11 +96,10 @@ func (m *StreamingConnectionManager) Clients() []StreamingClient { // Run runs the manager. It manages multiple websocket clients. func (m *StreamingConnectionManager) Run(ctx context.Context) { - done := false - for !done { + for { select { case <-ctx.Done(): - done = true + return case meta := <-m.register: m.clients[meta.client] = meta.cancelFunc case client := <-m.unregister: diff --git a/internal/api/websocket_client.go b/internal/api/websocket_client.go index a12f7b86dc..c43be841ab 100644 --- a/internal/api/websocket_client.go +++ b/internal/api/websocket_client.go @@ -174,7 +174,7 @@ func (c *WebsocketClient) readPump() { continue } - if err := HandleStreamingMessage(c, request); err != nil { + if err := handleStreamingMessage(c, request); err != nil { c.logger.WithErr(err).Errorf("Handle websocket message") } } @@ -182,7 +182,7 @@ func (c *WebsocketClient) readPump() { close(c.stopCh) } -func HandleStreamingMessage(client StreamingClient, request StreamRequest) error { +func handleStreamingMessage(client StreamingClient, request StreamRequest) error { handlers, ok := client.Handlers()[request.Type] if !ok { return handleUnknownRequest(client, request)