From 12dc4036f7fa40a074d11cf13a7a83e6b472348a Mon Sep 17 00:00:00 2001 From: Dmitry Date: Tue, 28 Sep 2021 00:26:49 -0700 Subject: [PATCH] [k8sattributes processor] Add optional container metadata This change provides an option to fetch container metadata from k8s API in addition to k8s pod metadata. The following attributes now can be automatically added by the k8sattributes processor: - container.image.name - container.image.tag - container.id `container.image.name` and `container.image.tag` require additional container identifier present in resource attributes: `container.name`. `container.id` requires additional container run identifiers present in resource attributes: `container.name` and `run_id`. `run_id` identified is a subject to change, see /~https://github.com/open-telemetry/opentelemetry-specification/pull/1945 --- processor/k8sattributesprocessor/doc.go | 23 +- .../k8sattributesprocessor/kube/client.go | 49 +++++ .../kube/client_test.go | 160 ++++++++++++++ processor/k8sattributesprocessor/kube/kube.go | 34 ++- processor/k8sattributesprocessor/options.go | 6 + processor/k8sattributesprocessor/processor.go | 61 +++++- .../k8sattributesprocessor/processor_test.go | 197 ++++++++++++++++++ 7 files changed, 515 insertions(+), 15 deletions(-) diff --git a/processor/k8sattributesprocessor/doc.go b/processor/k8sattributesprocessor/doc.go index 558a6abcbdb4..ecede275d360 100644 --- a/processor/k8sattributesprocessor/doc.go +++ b/processor/k8sattributesprocessor/doc.go @@ -40,7 +40,28 @@ // // If Pod association rules are not configured resources are associated with metadata only by connection's IP Address. // -// +// Which metadata to collect is determined by `metadata` configuration that defines list of resource attributes +// to be added. Items in the list called exactly the same as the resource attributes that will be added. +// The following list of attributes is enabled by default if `metadata` configuration is not specified: +// - k8s.namespace.name +// - k8s.pod.name +// - k8s.pod.uid +// - k8s.pod.start_time +// - k8s.deployment.name +// - k8s.cluster.name +// - k8s.node.name +// Not all the attributes are guaranteed to be added. For example `k8s.cluster.name` usually is not provided by k8s API, +// so likely it won't be set as an attribute. + +// The following attributes are not included by default, but can be enabled with `metadata` config option. +// 1. Container spec attributes - will be set only if container identifying attribute `container.name` is set +// as a resource attributes (similar to all other attributes, pod has to be identified as well): +// - container.image.name +// - container.image.tag +// 2. Container status attributes - in addition to pod identifier and `container.name` attribute, these attributes +// require identifier of a particular container run set as `run_id` in resource attributes: +// - container.id + //The k8sattributesprocessor can be used for automatic tagging of spans, metrics and logs with k8s labels and annotations from pods and namespaces. //The config for associating the data passing through the processor (spans, metrics and logs) with specific Pod/Namespace annotations/labels is configured via "annotations" and "labels" keys. //This config represents a list of annotations/labels that are extracted from pods/namespaces and added to spans, metrics and logs. diff --git a/processor/k8sattributesprocessor/kube/client.go b/processor/k8sattributesprocessor/kube/client.go index d623525d3b7a..5f2801f61cfb 100644 --- a/processor/k8sattributesprocessor/kube/client.go +++ b/processor/k8sattributesprocessor/kube/client.go @@ -330,6 +330,48 @@ func (c *WatchClient) extractPodAttributes(pod *api_v1.Pod) map[string]string { return tags } +func (c *WatchClient) extractPodContainersAttributes(pod *api_v1.Pod) map[string]*Container { + containers := map[string]*Container{} + + if c.Rules.ContainerImageName || c.Rules.ContainerImageTag { + for _, spec := range append(pod.Spec.Containers, pod.Spec.InitContainers...) { + container := &Container{} + imageParts := strings.Split(spec.Image, ":") + if c.Rules.ContainerImageName { + container.ImageName = imageParts[0] + } + if c.Rules.ContainerImageTag && len(imageParts) > 1 { + container.ImageTag = imageParts[1] + } + containers[spec.Name] = container + } + } + + if c.Rules.ContainerID { + for _, apiStatus := range append(pod.Status.ContainerStatuses, pod.Status.InitContainerStatuses...) { + container, ok := containers[apiStatus.Name] + if !ok { + container = &Container{} + containers[apiStatus.Name] = container + } + if container.Statuses == nil { + container.Statuses = map[int]ContainerStatus{} + } + + containerID := apiStatus.ContainerID + + // Remove container runtime prefix + idParts := strings.Split(containerID, "://") + if len(idParts) == 2 { + containerID = idParts[1] + } + + container.Statuses[int(apiStatus.RestartCount)] = ContainerStatus{containerID} + } + } + return containers +} + func (c *WatchClient) extractNamespaceAttributes(namespace *api_v1.Namespace) map[string]string { tags := map[string]string{} @@ -378,6 +420,9 @@ func (c *WatchClient) addOrUpdatePod(pod *api_v1.Pod) { newPod.Ignore = true } else { newPod.Attributes = c.extractPodAttributes(pod) + if needContainerAttributes(c.Rules) { + newPod.Containers = c.extractPodContainersAttributes(pod) + } } c.m.Lock() @@ -513,3 +558,7 @@ func (c *WatchClient) extractNamespaceLabelsAnnotations() bool { return false } + +func needContainerAttributes(rules ExtractionRules) bool { + return rules.ContainerImageName || rules.ContainerImageTag || rules.ContainerID +} diff --git a/processor/k8sattributesprocessor/kube/client_test.go b/processor/k8sattributesprocessor/kube/client_test.go index c8e1ebe5f654..f57686b5bf4e 100644 --- a/processor/k8sattributesprocessor/kube/client_test.go +++ b/processor/k8sattributesprocessor/kube/client_test.go @@ -747,6 +747,166 @@ func TestPodIgnorePatterns(t *testing.T) { } } +func Test_extractPodContainersAttributes(t *testing.T) { + pod := api_v1.Pod{ + Spec: api_v1.PodSpec{ + Containers: []api_v1.Container{ + { + Name: "container1", + Image: "test/image1:0.1.0", + }, + { + Name: "container2", + Image: "test/image2:0.2.0", + }, + }, + InitContainers: []api_v1.Container{ + { + Name: "init_container", + Image: "test/init-image:1.0.2", + }, + }, + }, + Status: api_v1.PodStatus{ + ContainerStatuses: []api_v1.ContainerStatus{ + { + Name: "container1", + ContainerID: "docker://container1-id-123", + RestartCount: 0, + }, + { + Name: "container2", + ContainerID: "docker://container2-id-456", + RestartCount: 2, + }, + }, + InitContainerStatuses: []api_v1.ContainerStatus{ + { + Name: "init_container", + ContainerID: "containerd://init-container-id-123", + RestartCount: 0, + }, + }, + }, + } + tests := []struct { + name string + rules ExtractionRules + pod api_v1.Pod + want map[string]*Container + }{ + { + name: "no-data", + rules: ExtractionRules{ + ContainerImageName: true, + ContainerImageTag: true, + ContainerID: true, + }, + pod: api_v1.Pod{}, + want: map[string]*Container{}, + }, + { + name: "no-rules", + rules: ExtractionRules{}, + pod: pod, + want: map[string]*Container{}, + }, + { + name: "image-name-only", + rules: ExtractionRules{ + ContainerImageName: true, + }, + pod: pod, + want: map[string]*Container{ + "container1": {ImageName: "test/image1"}, + "container2": {ImageName: "test/image2"}, + "init_container": {ImageName: "test/init-image"}, + }, + }, + { + name: "no-image-tag-available", + rules: ExtractionRules{ + ContainerImageName: true, + }, + pod: api_v1.Pod{ + Spec: api_v1.PodSpec{ + Containers: []api_v1.Container{ + { + Name: "test-container", + Image: "test/image", + }, + }, + }, + }, + want: map[string]*Container{ + "test-container": {ImageName: "test/image"}, + }, + }, + { + name: "container-id-only", + rules: ExtractionRules{ + ContainerID: true, + }, + pod: pod, + want: map[string]*Container{ + "container1": { + Statuses: map[int]ContainerStatus{ + 0: {ContainerID: "container1-id-123"}, + }, + }, + "container2": { + Statuses: map[int]ContainerStatus{ + 2: {ContainerID: "container2-id-456"}, + }, + }, + "init_container": { + Statuses: map[int]ContainerStatus{ + 0: {ContainerID: "init-container-id-123"}, + }, + }, + }, + }, + { + name: "all-container-attributes", + rules: ExtractionRules{ + ContainerImageName: true, + ContainerImageTag: true, + ContainerID: true, + }, + pod: pod, + want: map[string]*Container{ + "container1": { + ImageName: "test/image1", + ImageTag: "0.1.0", + Statuses: map[int]ContainerStatus{ + 0: {ContainerID: "container1-id-123"}, + }, + }, + "container2": { + ImageName: "test/image2", + ImageTag: "0.2.0", + Statuses: map[int]ContainerStatus{ + 2: {ContainerID: "container2-id-456"}, + }, + }, + "init_container": { + ImageName: "test/init-image", + ImageTag: "1.0.2", + Statuses: map[int]ContainerStatus{ + 0: {ContainerID: "init-container-id-123"}, + }, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := WatchClient{Rules: tt.rules} + assert.Equal(t, tt.want, c.extractPodContainersAttributes(&tt.pod)) + }) + } +} + func Test_extractField(t *testing.T) { c := WatchClient{} type args struct { diff --git a/processor/k8sattributesprocessor/kube/kube.go b/processor/k8sattributesprocessor/kube/kube.go index dd981746e1ef..80b938177b59 100644 --- a/processor/k8sattributesprocessor/kube/kube.go +++ b/processor/k8sattributesprocessor/kube/kube.go @@ -71,9 +71,26 @@ type Pod struct { Ignore bool Namespace string + // Containers is a map of container name to Container struct. + Containers map[string]*Container + DeletedAt time.Time } +// Container stores resource attributes for a specific container defined by k8s pod spec. +type Container struct { + ImageName string + ImageTag string + + // Statuses is a map of container run_id (restart count) attribute to ContainerStatus struct. + Statuses map[int]ContainerStatus +} + +// ContainerStatus stores resource attributes for a particular container run defined by k8s pod status. +type ContainerStatus struct { + ContainerID string +} + // Namespace represents a kubernetes namespace. type Namespace struct { Name string @@ -118,13 +135,16 @@ type FieldFilter struct { // ExtractionRules is used to specify the information that needs to be extracted // from pods and added to the spans as tags. type ExtractionRules struct { - Deployment bool - Namespace bool - PodName bool - PodUID bool - Node bool - Cluster bool - StartTime bool + Deployment bool + Namespace bool + PodName bool + PodUID bool + Node bool + Cluster bool + StartTime bool + ContainerID bool + ContainerImageName bool + ContainerImageTag bool Annotations []FieldExtractionRule Labels []FieldExtractionRule diff --git a/processor/k8sattributesprocessor/options.go b/processor/k8sattributesprocessor/options.go index 71178b781c4a..33a4a14955e8 100644 --- a/processor/k8sattributesprocessor/options.go +++ b/processor/k8sattributesprocessor/options.go @@ -99,6 +99,12 @@ func WithExtractMetadata(fields ...string) Option { p.rules.Cluster = true case metadataNode, conventions.AttributeK8SNodeName: p.rules.Node = true + case conventions.AttributeContainerID: + p.rules.ContainerID = true + case conventions.AttributeContainerImageName: + p.rules.ContainerImageName = true + case conventions.AttributeContainerImageTag: + p.rules.ContainerImageTag = true default: return fmt.Errorf("\"%s\" is not a supported metadata field", field) } diff --git a/processor/k8sattributesprocessor/processor.go b/processor/k8sattributesprocessor/processor.go index cb60769ad57c..2bd19b8ca591 100644 --- a/processor/k8sattributesprocessor/processor.go +++ b/processor/k8sattributesprocessor/processor.go @@ -16,6 +16,8 @@ package k8sattributesprocessor import ( "context" + "fmt" + "strconv" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/model/pdata" @@ -29,6 +31,10 @@ import ( const ( k8sIPLabelName string = "k8s.pod.ip" clientIPLabelName string = "ip" + + // TODO: update the label to semantic convention defined in this PR: + // /~https://github.com/open-telemetry/opentelemetry-specification/pull/1945 + k8sContainerRunIDLabelName string = "run_id" ) type kubernetesprocessor struct { @@ -117,9 +123,11 @@ func (kp *kubernetesprocessor) processResource(ctx context.Context, resource pda } if podIdentifierKey != "" { - attrsToAdd := kp.getAttributesForPod(podIdentifierValue) - for key, val := range attrsToAdd { - resource.Attributes().InsertString(key, val) + if pod, ok := kp.kc.GetPod(podIdentifierValue); ok { + for key, val := range pod.Attributes { + resource.Attributes().InsertString(key, val) + } + kp.addContainerAttributes(resource.Attributes(), pod) } } @@ -131,12 +139,35 @@ func (kp *kubernetesprocessor) processResource(ctx context.Context, resource pda } } -func (kp *kubernetesprocessor) getAttributesForPod(identifier kube.PodIdentifier) map[string]string { - pod, ok := kp.kc.GetPod(identifier) +// addContainerAttributes looks if pod has any container identifiers and adds additional container attributes +func (kp *kubernetesprocessor) addContainerAttributes(attrs pdata.AttributeMap, pod *kube.Pod) { + containerName := stringAttributeFromMap(attrs, conventions.AttributeK8SContainerName) + if containerName == "" { + return + } + containerSpec, ok := pod.Containers[containerName] if !ok { - return nil + return + } + + if containerSpec.ImageName != "" { + attrs.InsertString(conventions.AttributeContainerImageName, containerSpec.ImageName) + } + if containerSpec.ImageTag != "" { + attrs.InsertString(conventions.AttributeContainerImageTag, containerSpec.ImageTag) + } + + runIDAttr, ok := attrs.Get(k8sContainerRunIDLabelName) + if ok { + runID, err := intFromAttribute(runIDAttr) + if err == nil { + if containerStatus, ok := containerSpec.Statuses[runID]; ok && containerStatus.ContainerID != "" { + attrs.InsertString(conventions.AttributeContainerID, containerStatus.ContainerID) + } + } else { + kp.logger.Debug(err.Error()) + } } - return pod.Attributes } func (kp *kubernetesprocessor) getAttributesForPodsNamespace(namespace string) map[string]string { @@ -146,3 +177,19 @@ func (kp *kubernetesprocessor) getAttributesForPodsNamespace(namespace string) m } return ns.Attributes } + +// intFromAttribute extracts int value from an attribute stored as string or int +func intFromAttribute(val pdata.AttributeValue) (int, error) { + switch val.Type() { + case pdata.AttributeValueTypeInt: + return int(val.IntVal()), nil + case pdata.AttributeValueTypeString: + i, err := strconv.Atoi(val.StringVal()) + if err != nil { + return 0, err + } + return i, nil + default: + return 0, fmt.Errorf("wrong attribute type %v, expected int", val.Type()) + } +} diff --git a/processor/k8sattributesprocessor/processor_test.go b/processor/k8sattributesprocessor/processor_test.go index bac4301122d1..dda00e20ced7 100644 --- a/processor/k8sattributesprocessor/processor_test.go +++ b/processor/k8sattributesprocessor/processor_test.go @@ -283,6 +283,18 @@ func withPodUID(uid string) generateResourceFunc { } } +func withContainerName(containerName string) generateResourceFunc { + return func(res pdata.Resource) { + res.Attributes().InsertString(conventions.AttributeK8SContainerName, containerName) + } +} + +func withContainerRunID(containerRunID string) generateResourceFunc { + return func(res pdata.Resource) { + res.Attributes().InsertString(k8sContainerRunIDLabelName, containerRunID) + } +} + func TestIPDetectionFromContext(t *testing.T) { m := newMultiTest(t, NewFactory().CreateDefaultConfig(), nil) @@ -652,6 +664,146 @@ func TestProcessorAddLabels(t *testing.T) { } } +func TestProcessorAddContainerAttributes(t *testing.T) { + tests := []struct { + name string + op func(kp *kubernetesprocessor) + resourceGens []generateResourceFunc + wantAttrs map[string]string + }{ + { + name: "image-only", + op: func(kp *kubernetesprocessor) { + kp.podAssociations = []kube.Association{ + { + From: "resource_attribute", + Name: "k8s.pod.uid", + }, + } + kp.kc.(*fakeClient).Pods[kube.PodIdentifier("ef10d10b-2da5-4030-812e-5f45c1531227")] = &kube.Pod{ + Containers: map[string]*kube.Container{ + "app": { + ImageName: "test/app", + ImageTag: "1.0.1", + }, + }, + } + }, + resourceGens: []generateResourceFunc{ + withPodUID("ef10d10b-2da5-4030-812e-5f45c1531227"), + withContainerName("app"), + }, + wantAttrs: map[string]string{ + conventions.AttributeK8SPodUID: "ef10d10b-2da5-4030-812e-5f45c1531227", + conventions.AttributeK8SContainerName: "app", + conventions.AttributeContainerImageName: "test/app", + conventions.AttributeContainerImageTag: "1.0.1", + }, + }, + { + name: "container-id-only", + op: func(kp *kubernetesprocessor) { + kp.kc.(*fakeClient).Pods[kube.PodIdentifier("1.1.1.1")] = &kube.Pod{ + Containers: map[string]*kube.Container{ + "app": { + Statuses: map[int]kube.ContainerStatus{ + 0: {ContainerID: "fcd58c97330c1dc6615bd520031f6a703a7317cd92adc96013c4dd57daad0b5f"}, + 1: {ContainerID: "6a7f1a598b5dafec9c193f8f8d63f6e5839b8b0acd2fe780f94285e26c05580e"}, + }, + }, + }, + } + }, + resourceGens: []generateResourceFunc{ + withPassthroughIP("1.1.1.1"), + withContainerName("app"), + withContainerRunID("1"), + }, + wantAttrs: map[string]string{ + k8sIPLabelName: "1.1.1.1", + conventions.AttributeK8SContainerName: "app", + k8sContainerRunIDLabelName: "1", + conventions.AttributeContainerID: "6a7f1a598b5dafec9c193f8f8d63f6e5839b8b0acd2fe780f94285e26c05580e", + }, + }, + { + name: "container-name-mismatch", + op: func(kp *kubernetesprocessor) { + kp.kc.(*fakeClient).Pods[kube.PodIdentifier("1.1.1.1")] = &kube.Pod{ + Containers: map[string]*kube.Container{ + "app": { + ImageName: "test/app", + ImageTag: "1.0.1", + Statuses: map[int]kube.ContainerStatus{ + 0: {ContainerID: "fcd58c97330c1dc6615bd520031f6a703a7317cd92adc96013c4dd57daad0b5f"}, + }, + }, + }, + } + }, + resourceGens: []generateResourceFunc{ + withPassthroughIP("1.1.1.1"), + withContainerName("new-app"), + withContainerRunID("0"), + }, + wantAttrs: map[string]string{ + k8sIPLabelName: "1.1.1.1", + conventions.AttributeK8SContainerName: "new-app", + k8sContainerRunIDLabelName: "0", + }, + }, + { + name: "container-run-id-mismatch", + op: func(kp *kubernetesprocessor) { + kp.kc.(*fakeClient).Pods[kube.PodIdentifier("1.1.1.1")] = &kube.Pod{ + Containers: map[string]*kube.Container{ + "app": { + ImageName: "test/app", + Statuses: map[int]kube.ContainerStatus{ + 0: {ContainerID: "fcd58c97330c1dc6615bd520031f6a703a7317cd92adc96013c4dd57daad0b5f"}, + }, + }, + }, + } + }, + resourceGens: []generateResourceFunc{ + withPassthroughIP("1.1.1.1"), + withContainerName("app"), + withContainerRunID("1"), + }, + wantAttrs: map[string]string{ + k8sIPLabelName: "1.1.1.1", + conventions.AttributeK8SContainerName: "app", + k8sContainerRunIDLabelName: "1", + conventions.AttributeContainerImageName: "test/app", + }, + }, + } + + for _, tt := range tests { + m := newMultiTest( + t, + NewFactory().CreateDefaultConfig(), + nil, + ) + m.kubernetesProcessorOperation(tt.op) + m.testConsume(context.Background(), + generateTraces(tt.resourceGens...), + generateMetrics(tt.resourceGens...), + generateLogs(tt.resourceGens...), + nil, + ) + + m.assertBatchesLen(1) + m.assertResource(0, func(r pdata.Resource) { + require.Equal(t, len(tt.wantAttrs), r.Attributes().Len()) + for k, v := range tt.wantAttrs { + assertResourceHasStringAttribute(t, r, k, v) + } + }) + } +} + func TestProcessorPicksUpPassthoughPodIp(t *testing.T) { m := newMultiTest( t, @@ -908,3 +1060,48 @@ func assertResourceHasStringAttribute(t *testing.T, r pdata.Resource, k, v strin assert.EqualValues(t, pdata.AttributeValueTypeString, got.Type(), "attribute %s is not of type string", k) assert.EqualValues(t, v, got.StringVal(), "attribute %s is not equal to %s", k, v) } + +func Test_intFromAttribute(t *testing.T) { + tests := []struct { + name string + attrVal pdata.AttributeValue + wantInt int + wantErr bool + }{ + { + name: "wrong-type", + attrVal: pdata.NewAttributeValueBool(true), + wantInt: 0, + wantErr: true, + }, + { + name: "wrong-string-number", + attrVal: pdata.NewAttributeValueString("NaN"), + wantInt: 0, + wantErr: true, + }, + { + name: "valid-string-number", + attrVal: pdata.NewAttributeValueString("3"), + wantInt: 3, + wantErr: false, + }, + { + name: "valid-int-number", + attrVal: pdata.NewAttributeValueInt(1), + wantInt: 1, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := intFromAttribute(tt.attrVal) + assert.Equal(t, tt.wantInt, got) + if tt.wantErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + }) + } +}