Skip to content

Commit

Permalink
Move observability gRPC from linkerd-controller to viz
Browse files Browse the repository at this point in the history
This is based on the branch `alpeb/no-more-controller-prom` from #5556

We continue removing the linkerd-controller dependencies on viz stuff, except for things required for the tap server, which can be tackled separately afterwards.

- Created a new gRPC server under `viz/metrics-api` moving prometheus-dependent functions out of the core gRPC server and into it (same thing for the accompaigning http server).
- Did the same for the `PublicAPIClient` (now called just `Client`) interface. The `VizAPIClient` interface disappears as it's enough to just rely on the viz `ApiClient` protobuf type.
- Moved the other files implementing the rest of the gRPC functions from `controller/api/public` to `viz/metrics-api` (`edge.go`, `stat_summary.go`, etc.).
- Also simplified some type names to avoid stuttering.

Tests are not supposed to pass in this branch.
  • Loading branch information
alpeb committed Jan 18, 2021
1 parent 921527b commit b8e19a8
Show file tree
Hide file tree
Showing 25 changed files with 2,112 additions and 1,649 deletions.
99 changes: 11 additions & 88 deletions controller/api/public/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,12 @@ import (

"github.com/golang/protobuf/proto"
destinationPb "github.com/linkerd/linkerd2-proxy-api/go/destination"
healthcheckPb "github.com/linkerd/linkerd2/controller/gen/common/healthcheck"
publicPb "github.com/linkerd/linkerd2/controller/gen/public"
"github.com/linkerd/linkerd2/pkg/k8s"
"github.com/linkerd/linkerd2/pkg/protohttp"
pb "github.com/linkerd/linkerd2/viz/metrics-api/gen/viz"
log "github.com/sirupsen/logrus"
"go.opencensus.io/plugin/ochttp"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

const (
Expand All @@ -31,81 +27,24 @@ const (
apiDeployment = "linkerd-controller"
)

// PublicAPIClient wraps one gRPC client interface for publicPb.Api:
// TODO: remove stuttering name when viz api code moves to /viz
// nolint
type PublicAPIClient interface {
// Client wraps one gRPC client interface for publicPb.Api:
type Client interface {
publicPb.ApiClient
destinationPb.DestinationClient
}

// VizAPIClient wraps one gRPC client interface for pb.Api:
type VizAPIClient interface {
pb.ApiClient
}

type grpcOverHTTPClient struct {
serverURL *url.URL
httpClient *http.Client
controlPlaneNamespace string
}

func (c *grpcOverHTTPClient) StatSummary(ctx context.Context, req *pb.StatSummaryRequest, _ ...grpc.CallOption) (*pb.StatSummaryResponse, error) {
var msg pb.StatSummaryResponse
err := c.apiRequest(ctx, "StatSummary", req, &msg)
return &msg, err
}

func (c *grpcOverHTTPClient) Edges(ctx context.Context, req *pb.EdgesRequest, _ ...grpc.CallOption) (*pb.EdgesResponse, error) {
var msg pb.EdgesResponse
err := c.apiRequest(ctx, "Edges", req, &msg)
return &msg, err
}

func (c *grpcOverHTTPClient) TopRoutes(ctx context.Context, req *pb.TopRoutesRequest, _ ...grpc.CallOption) (*pb.TopRoutesResponse, error) {
var msg pb.TopRoutesResponse
err := c.apiRequest(ctx, "TopRoutes", req, &msg)
return &msg, err
}

func (c *grpcOverHTTPClient) Gateways(ctx context.Context, req *pb.GatewaysRequest, _ ...grpc.CallOption) (*pb.GatewaysResponse, error) {
var msg pb.GatewaysResponse
err := c.apiRequest(ctx, "Gateways", req, &msg)
return &msg, err
}

func (c *grpcOverHTTPClient) Version(ctx context.Context, req *publicPb.Empty, _ ...grpc.CallOption) (*publicPb.VersionInfo, error) {
var msg publicPb.VersionInfo
err := c.apiRequest(ctx, "Version", req, &msg)
return &msg, err
}

func (c *grpcOverHTTPClient) SelfCheck(ctx context.Context, req *healthcheckPb.SelfCheckRequest, _ ...grpc.CallOption) (*healthcheckPb.SelfCheckResponse, error) {
var msg healthcheckPb.SelfCheckResponse
err := c.apiRequest(ctx, "SelfCheck", req, &msg)
return &msg, err
}

func (c *grpcOverHTTPClient) ListPods(ctx context.Context, req *pb.ListPodsRequest, _ ...grpc.CallOption) (*pb.ListPodsResponse, error) {
var msg pb.ListPodsResponse
err := c.apiRequest(ctx, "ListPods", req, &msg)
return &msg, err
}

func (c *grpcOverHTTPClient) ListServices(ctx context.Context, req *pb.ListServicesRequest, _ ...grpc.CallOption) (*pb.ListServicesResponse, error) {
var msg pb.ListServicesResponse
err := c.apiRequest(ctx, "ListServices", req, &msg)
return &msg, err
}

func (c *grpcOverHTTPClient) Tap(ctx context.Context, req *pb.TapRequest, _ ...grpc.CallOption) (pb.Api_TapClient, error) {
return nil, status.Error(codes.Unimplemented, "Tap is deprecated in public API, use tap APIServer")
}

func (c *grpcOverHTTPClient) TapByResource(ctx context.Context, req *pb.TapByResourceRequest, _ ...grpc.CallOption) (pb.Api_TapByResourceClient, error) {
return nil, status.Error(codes.Unimplemented, "Tap is deprecated in public API, use tap APIServer")
}

func (c *grpcOverHTTPClient) Get(ctx context.Context, req *destinationPb.GetDestination, _ ...grpc.CallOption) (destinationPb.Destination_GetClient, error) {
url := c.endpointNameToPublicAPIURL("DestinationGet")
httpRsp, err := c.post(ctx, url, req)
Expand Down Expand Up @@ -184,7 +123,7 @@ func (c destinationClient) Recv() (*destinationPb.Update, error) {
return &msg, err
}

func newClient(apiURL *url.URL, httpClientToUse *http.Client, controlPlaneNamespace string) (VizAPIClient, error) {
func newClient(apiURL *url.URL, httpClientToUse *http.Client, controlPlaneNamespace string) (Client, error) {
if !apiURL.IsAbs() {
return nil, fmt.Errorf("server URL must be absolute, was [%s]", apiURL.String())
}
Expand All @@ -200,17 +139,9 @@ func newClient(apiURL *url.URL, httpClientToUse *http.Client, controlPlaneNamesp
}, nil
}

func newPublicClient(apiURL *url.URL, httpClientToUse *http.Client, controlPlaneNamespace string) (PublicAPIClient, error) {
client, err := newClient(apiURL, httpClientToUse, controlPlaneNamespace)
if err != nil {
return nil, err
}
return client.(PublicAPIClient), nil
}

// NewInternalClient creates a new Viz API client intended to run inside a
// NewInternalClient creates a new public API client intended to run inside a
// Kubernetes cluster.
func NewInternalClient(controlPlaneNamespace string, kubeAPIHost string) (VizAPIClient, error) {
func NewInternalClient(controlPlaneNamespace string, kubeAPIHost string) (Client, error) {
apiURL, err := url.Parse(fmt.Sprintf("http://%s/", kubeAPIHost))
if err != nil {
return nil, err
Expand All @@ -221,17 +152,13 @@ func NewInternalClient(controlPlaneNamespace string, kubeAPIHost string) (VizAPI

// NewInternalPublicClient creates a new Public API client intended to run inside a
// Kubernetes cluster.
func NewInternalPublicClient(controlPlaneNamespace string, kubeAPIHost string) (PublicAPIClient, error) {
client, err := NewInternalClient(controlPlaneNamespace, kubeAPIHost)
if err != nil {
return nil, err
}
return client.(PublicAPIClient), nil
func NewInternalPublicClient(controlPlaneNamespace string, kubeAPIHost string) (Client, error) {
return NewInternalClient(controlPlaneNamespace, kubeAPIHost)
}

// NewExternalClient creates a new Viz API client intended to run from
// NewExternalClient creates a new public API client intended to run from
// outside a Kubernetes cluster.
func NewExternalClient(ctx context.Context, controlPlaneNamespace string, kubeAPI *k8s.KubernetesAPI) (VizAPIClient, error) {
func NewExternalClient(ctx context.Context, controlPlaneNamespace string, kubeAPI *k8s.KubernetesAPI) (Client, error) {
portforward, err := k8s.NewPortForward(
ctx,
kubeAPI,
Expand Down Expand Up @@ -265,10 +192,6 @@ func NewExternalClient(ctx context.Context, controlPlaneNamespace string, kubeAP

// NewExternalPublicClient creates a new Public API client intended to run from
// outside a Kubernetes cluster.
func NewExternalPublicClient(ctx context.Context, controlPlaneNamespace string, kubeAPI *k8s.KubernetesAPI) (PublicAPIClient, error) {
client, err := NewExternalClient(ctx, controlPlaneNamespace, kubeAPI)
if err != nil {
return nil, err
}
return client.(PublicAPIClient), nil
func NewExternalPublicClient(ctx context.Context, controlPlaneNamespace string, kubeAPI *k8s.KubernetesAPI) (Client, error) {
return NewExternalClient(ctx, controlPlaneNamespace, kubeAPI)
}
2 changes: 1 addition & 1 deletion controller/api/public/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestNewInternalClient(t *testing.T) {
Host: "some-hostname",
Path: "/",
}
client, err := newPublicClient(apiURL, mockHTTPClient, "linkerd")
client, err := newClient(apiURL, mockHTTPClient, "linkerd")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
Expand Down
Loading

0 comments on commit b8e19a8

Please sign in to comment.