From 7507ea6bcbef3e25822b2bedccdc923ad1134b59 Mon Sep 17 00:00:00 2001 From: Zach Reyes <39203661+zasweq@users.noreply.github.com> Date: Tue, 14 Mar 2023 20:20:09 -0400 Subject: [PATCH] gcp/observability: Change logging schema and set queue size limit for logs and batching delay (#6118) --- gcp/observability/exporting.go | 3 ++- gcp/observability/logging.go | 25 ++++++++++++++----------- gcp/observability/logging_test.go | 28 ++++++++++++++++++---------- 3 files changed, 34 insertions(+), 22 deletions(-) diff --git a/gcp/observability/exporting.go b/gcp/observability/exporting.go index 862014640deb..3c27b3533e04 100644 --- a/gcp/observability/exporting.go +++ b/gcp/observability/exporting.go @@ -21,6 +21,7 @@ package observability import ( "context" "fmt" + "time" "google.golang.org/api/option" "google.golang.org/grpc" @@ -72,7 +73,7 @@ func newCloudLoggingExporter(ctx context.Context, config *config) (loggingExport return &cloudLoggingExporter{ projectID: config.ProjectID, client: c, - logger: c.Logger("microservices.googleapis.com/observability/grpc", gcplogging.CommonLabels(config.Labels)), + logger: c.Logger("microservices.googleapis.com/observability/grpc", gcplogging.CommonLabels(config.Labels), gcplogging.BufferedByteLimit(1024*1024*50), gcplogging.DelayThreshold(time.Second*10)), }, nil } diff --git a/gcp/observability/logging.go b/gcp/observability/logging.go index 7be05975146c..b04c60ba0430 100644 --- a/gcp/observability/logging.go +++ b/gcp/observability/logging.go @@ -33,6 +33,7 @@ import ( "google.golang.org/grpc" binlogpb "google.golang.org/grpc/binarylog/grpc_binarylog_v1" + "google.golang.org/grpc/codes" "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/binarylog" iblog "google.golang.org/grpc/internal/binarylog" @@ -44,6 +45,8 @@ var lExporter loggingExporter var newLoggingExporter = newCloudLoggingExporter +var canonicalString = internal.CanonicalString.(func(codes.Code) string) + // translateMetadata translates the metadata from Binary Logging format to // its GrpcLogEntry equivalent. func translateMetadata(m *binlogpb.Metadata) map[string]string { @@ -153,7 +156,7 @@ type payload struct { // Timeout is the RPC timeout value. Timeout time.Duration `json:"timeout,omitempty"` // StatusCode is the gRPC status code. - StatusCode uint32 `json:"statusCode,omitempty"` + StatusCode string `json:"statusCode,omitempty"` // StatusMessage is the gRPC status message. StatusMessage string `json:"statusMessage,omitempty"` // StatusDetails is the value of the grpc-status-details-bin metadata key, @@ -170,9 +173,9 @@ type addrType int const ( typeUnknown addrType = iota // `json:"TYPE_UNKNOWN"` - typeIPv4 // `json:"TYPE_IPV4"` - typeIPv6 // `json:"TYPE_IPV6"` - typeUnix // `json:"TYPE_UNIX"` + ipv4 // `json:"IPV4"` + ipv6 // `json:"IPV6"` + unix // `json:"UNIX"` ) func (at addrType) MarshalJSON() ([]byte, error) { @@ -180,12 +183,12 @@ func (at addrType) MarshalJSON() ([]byte, error) { switch at { case typeUnknown: buffer.WriteString("TYPE_UNKNOWN") - case typeIPv4: - buffer.WriteString("TYPE_IPV4") - case typeIPv6: - buffer.WriteString("TYPE_IPV6") - case typeUnix: - buffer.WriteString("TYPE_UNIX") + case ipv4: + buffer.WriteString("IPV4") + case ipv6: + buffer.WriteString("IPV6") + case unix: + buffer.WriteString("UNIX") } buffer.WriteString(`"`) return buffer.Bytes(), nil @@ -303,7 +306,7 @@ func (bml *binaryMethodLogger) buildGCPLoggingEntry(ctx context.Context, c iblog case binlogpb.GrpcLogEntry_EVENT_TYPE_SERVER_TRAILER: grpcLogEntry.Type = eventTypeServerTrailer grpcLogEntry.Payload.Metadata = translateMetadata(binLogEntry.GetTrailer().Metadata) - grpcLogEntry.Payload.StatusCode = binLogEntry.GetTrailer().GetStatusCode() + grpcLogEntry.Payload.StatusCode = canonicalString(codes.Code(binLogEntry.GetTrailer().GetStatusCode())) grpcLogEntry.Payload.StatusMessage = binLogEntry.GetTrailer().GetStatusMessage() grpcLogEntry.Payload.StatusDetails = binLogEntry.GetTrailer().GetStatusDetails() grpcLogEntry.PayloadTruncated = binLogEntry.GetPayloadTruncated() diff --git a/gcp/observability/logging_test.go b/gcp/observability/logging_test.go index a42b1da550fd..e76ba386d235 100644 --- a/gcp/observability/logging_test.go +++ b/gcp/observability/logging_test.go @@ -231,7 +231,8 @@ func (s) TestClientRPCEventsLogAll(t *testing.T) { SequenceID: 5, Authority: ss.Address, Payload: payload{ - Metadata: map[string]string{}, + Metadata: map[string]string{}, + StatusCode: "OK", }, }, } @@ -319,7 +320,8 @@ func (s) TestClientRPCEventsLogAll(t *testing.T) { Authority: ss.Address, SequenceID: 6, Payload: payload{ - Metadata: map[string]string{}, + Metadata: map[string]string{}, + StatusCode: "OK", }, }, } @@ -438,7 +440,8 @@ func (s) TestServerRPCEventsLogAll(t *testing.T) { SequenceID: 5, Authority: ss.Address, Payload: payload{ - Metadata: map[string]string{}, + Metadata: map[string]string{}, + StatusCode: "OK", }, }, } @@ -525,7 +528,8 @@ func (s) TestServerRPCEventsLogAll(t *testing.T) { Authority: ss.Address, SequenceID: 6, Payload: payload{ - Metadata: map[string]string{}, + Metadata: map[string]string{}, + StatusCode: "OK", }, }, } @@ -745,7 +749,8 @@ func (s) TestClientRPCEventsTruncateHeaderAndMetadata(t *testing.T) { SequenceID: 5, Authority: ss.Address, Payload: payload{ - Metadata: map[string]string{}, + Metadata: map[string]string{}, + StatusCode: "OK", }, }, } @@ -892,7 +897,8 @@ func (s) TestPrecedenceOrderingInConfiguration(t *testing.T) { SequenceID: 5, Authority: ss.Address, Payload: payload{ - Metadata: map[string]string{}, + Metadata: map[string]string{}, + StatusCode: "OK", }, }, } @@ -959,7 +965,8 @@ func (s) TestPrecedenceOrderingInConfiguration(t *testing.T) { Authority: ss.Address, SequenceID: 3, Payload: payload{ - Metadata: map[string]string{}, + Metadata: map[string]string{}, + StatusCode: "OK", }, }, } @@ -1080,14 +1087,14 @@ func (s) TestMarshalJSON(t *testing.T) { Payload: payload{ Metadata: map[string]string{"header1": "value1"}, Timeout: 20, - StatusCode: 3, + StatusCode: "UNKNOWN", StatusMessage: "ok", StatusDetails: []byte("ok"), MessageLength: 3, Message: []byte("wow"), }, Peer: address{ - Type: typeIPv4, + Type: ipv4, Address: "localhost", IPPort: 16000, }, @@ -1214,7 +1221,8 @@ func (s) TestMetadataTruncationAccountsKey(t *testing.T) { SequenceID: 5, Authority: ss.Address, Payload: payload{ - Metadata: map[string]string{}, + Metadata: map[string]string{}, + StatusCode: "OK", }, }, }