Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Harden and simplify yaml stream decoding/encoding #1931

Merged
merged 4 commits into from
Apr 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cluster/kubernetes/images.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (c *Cluster) ImagesToFetch() registry.ImageCreds {

imageCreds := make(registry.ImageCreds)
for _, workload := range workloads {
logger := log.With(c.logger, "resource", flux.MakeResourceID(ns.Name, kind, workload.name))
logger := log.With(c.logger, "resource", flux.MakeResourceID(ns.Name, kind, workload.GetName()))
mergeCredentials(logger.Log, c.includeImage, c.client, ns.Name, workload.podTemplate, imageCreds, seenCreds)
}

Expand Down
40 changes: 24 additions & 16 deletions cluster/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package kubernetes

import (
"bytes"
"encoding/json"
"fmt"
"sync"

k8syaml "github.com/ghodss/yaml"
"github.com/go-kit/kit/log"
"github.com/pkg/errors"
"gopkg.in/yaml.v2"
apiv1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -58,6 +59,7 @@ func MakeClusterClientset(core coreClient, dyn dynamicClient, fluxhelm fluxHelmC
// Kubernetes metadata. These methods are implemented by the
// Kubernetes API resource types.
type k8sObject interface {
GetName() string
GetNamespace() string
GetLabels() map[string]string
GetAnnotations() map[string]string
Expand Down Expand Up @@ -186,7 +188,7 @@ func (c *Cluster) AllWorkloads(namespace string) (res []cluster.Workload, err er

for _, workload := range workloads {
if !isAddon(workload) {
id := flux.MakeResourceID(ns.Name, kind, workload.name)
id := flux.MakeResourceID(ns.Name, kind, workload.GetName())
c.muSyncErrors.RLock()
workload.syncError = c.syncErrors[id]
c.muSyncErrors.RUnlock()
Expand Down Expand Up @@ -222,8 +224,14 @@ func (c *Cluster) Export() ([]byte, error) {
return nil, errors.Wrap(err, "getting namespaces")
}

encoder := yaml.NewEncoder(&config)
defer encoder.Close()

for _, ns := range namespaces {
err := appendYAML(&config, "v1", "Namespace", ns)
// kind & apiVersion must be set, since TypeMeta is not populated
ns.Kind = "Namespace"
ns.APIVersion = "v1"
err := encoder.Encode(yamlThroughJSON{ns})
if err != nil {
return nil, errors.Wrap(err, "marshalling namespace to YAML")
}
Expand All @@ -246,7 +254,7 @@ func (c *Cluster) Export() ([]byte, error) {

for _, pc := range workloads {
if !isAddon(pc) {
if err := appendYAML(&config, pc.apiVersion, pc.kind, pc.k8sObject); err != nil {
if err := encoder.Encode(yamlThroughJSON{pc.k8sObject}); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -325,18 +333,18 @@ func (c *Cluster) IsAllowedResource(id flux.ResourceID) bool {
return false
}

// kind & apiVersion must be passed separately as the object's TypeMeta is not populated
func appendYAML(buffer *bytes.Buffer, apiVersion, kind string, object interface{}) error {
yamlBytes, err := k8syaml.Marshal(object)
type yamlThroughJSON struct {
toMarshal interface{}
}

func (y yamlThroughJSON) MarshalYAML() (interface{}, error) {
rawJSON, err := json.Marshal(y.toMarshal)
if err != nil {
return err
return nil, fmt.Errorf("error marshaling into JSON: %s", err)
}
var jsonObj interface{}
if err = yaml.Unmarshal(rawJSON, &jsonObj); err != nil {
return nil, fmt.Errorf("error unmarshaling from JSON: %s", err)
}
buffer.WriteString("---\n")
buffer.WriteString("apiVersion: ")
buffer.WriteString(apiVersion)
buffer.WriteString("\nkind: ")
buffer.WriteString(kind)
buffer.WriteString("\n")
buffer.Write(yamlBytes)
return nil
return jsonObj, nil
}
73 changes: 21 additions & 52 deletions cluster/kubernetes/resource/load.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package resource

import (
"bufio"
"bytes"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"

"github.com/pkg/errors"
"gopkg.in/yaml.v2"
)

// Load takes paths to directories or files, and creates an object set
Expand Down Expand Up @@ -128,21 +129,26 @@ func looksLikeChart(dir string) bool {
// constructs an object set from the resources represented therein.
func ParseMultidoc(multidoc []byte, source string) (map[string]KubeManifest, error) {
objs := map[string]KubeManifest{}
chunks := bufio.NewScanner(bytes.NewReader(multidoc))
initialBuffer := make([]byte, 4096) // Matches startBufSize in bufio/scan.go
chunks.Buffer(initialBuffer, 1024*1024) // Allow growth to 1MB
chunks.Split(splitYAMLDocument)

decoder := yaml.NewDecoder(bytes.NewReader(multidoc))
var obj KubeManifest
var err error
for chunks.Scan() {
// It's not guaranteed that the return value of Bytes() will not be mutated later:
// https://golang.org/pkg/bufio/#Scanner.Bytes
// But we will be snaffling it away, so make a copy.
bytes := chunks.Bytes()
bytes2 := make([]byte, len(bytes), cap(bytes))
copy(bytes2, bytes)
if obj, err = unmarshalObject(source, bytes2); err != nil {
for {
// In order to use the decoder to extract raw documents
// from the stream, we decode generically and encode again.
// The result is the raw document from the stream
// (pretty-printed and without comments)
// NOTE: gopkg.in/yaml.v3 supports round tripping comments
// by using `gopkg.in/yaml.v3.Node`.
var val interface{}
if err := decoder.Decode(&val); err != nil {
break
}
bytes, err := yaml.Marshal(val)
if err != nil {
return nil, errors.Wrapf(err, "parsing YAML doc from %q", source)
}

if obj, err = unmarshalObject(source, bytes); err != nil {
return nil, errors.Wrapf(err, "parsing YAML doc from %q", source)
}
if obj == nil {
Expand All @@ -159,45 +165,8 @@ func ParseMultidoc(multidoc []byte, source string) (map[string]KubeManifest, err
}
}

if err := chunks.Err(); err != nil {
if err != io.EOF {
return objs, errors.Wrapf(err, "scanning multidoc from %q", source)
}
return objs, nil
}

// ---
// Taken directly from /~https://github.com/kubernetes/apimachinery/blob/master/pkg/util/yaml/decoder.go.

const yamlSeparator = "\n---"

// splitYAMLDocument is a bufio.SplitFunc for splitting YAML streams into individual documents.
func splitYAMLDocument(data []byte, atEOF bool) (advance int, token []byte, err error) {
2opremio marked this conversation as resolved.
Show resolved Hide resolved
if atEOF && len(data) == 0 {
return 0, nil, nil
}
sep := len([]byte(yamlSeparator))
if i := bytes.Index(data, []byte(yamlSeparator)); i >= 0 {
// We have a potential document terminator
i += sep
after := data[i:]
if len(after) == 0 {
// we can't read any more characters
if atEOF {
return len(data), data[:len(data)-sep], nil
}
return 0, nil, nil
}
if j := bytes.IndexByte(after, '\n'); j >= 0 {
return i + j + 1, data[0 : i-sep], nil
}
return 0, nil, nil
}
// If we're at EOF, we have a final, non-terminated line. Return it.
if atEOF {
return len(data), data, nil
}
// Request more data.
return 0, nil, nil
}

// ---
21 changes: 21 additions & 0 deletions cluster/kubernetes/resource/load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,27 @@ data:
}
}

func TestParseBoundaryMarkers(t *testing.T) {
doc := `---
kind: ConfigMap
metadata:
name: bigmap
---
...
---
...
---
...
---
...
`
buffer := bytes.NewBufferString(doc)

resources, err := ParseMultidoc(buffer.Bytes(), "test")
assert.NoError(t, err)
assert.Len(t, resources, 1)
}

func TestParseCronJob(t *testing.T) {
doc := `---
apiVersion: batch/v1beta1
Expand Down
41 changes: 17 additions & 24 deletions cluster/kubernetes/resourcekinds.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,6 @@ func init() {

type workload struct {
k8sObject
apiVersion string
kind string
name string
status string
rollout cluster.RolloutStatus
syncError error
Expand Down Expand Up @@ -179,11 +176,10 @@ func makeDeploymentWorkload(deployment *apiapps.Deployment) workload {
status = cluster.StatusError
}
}

// apiVersion & kind must be set, since TypeMeta is not populated
deployment.APIVersion = "apps/v1"
deployment.Kind = "Deployment"
return workload{
apiVersion: "apps/v1",
kind: "Deployment",
name: deployment.ObjectMeta.Name,
status: status,
rollout: rollout,
podTemplate: deployment.Spec.Template,
Expand Down Expand Up @@ -241,10 +237,10 @@ func makeDaemonSetWorkload(daemonSet *apiapps.DaemonSet) workload {
}
}

// apiVersion & kind must be set, since TypeMeta is not populated
daemonSet.APIVersion = "apps/v1"
daemonSet.Kind = "DaemonSet"
return workload{
apiVersion: "apps/v1",
kind: "DaemonSet",
name: daemonSet.ObjectMeta.Name,
status: status,
rollout: rollout,
podTemplate: daemonSet.Spec.Template,
Expand Down Expand Up @@ -334,10 +330,10 @@ func makeStatefulSetWorkload(statefulSet *apiapps.StatefulSet) workload {
}
}

// apiVersion & kind must be set, since TypeMeta is not populated
statefulSet.APIVersion = "apps/v1"
statefulSet.Kind = "StatefulSet"
return workload{
apiVersion: "apps/v1",
kind: "StatefulSet",
name: statefulSet.ObjectMeta.Name,
status: status,
rollout: rollout,
podTemplate: statefulSet.Spec.Template,
Expand Down Expand Up @@ -373,10 +369,9 @@ func (dk *cronJobKind) getWorkloads(c *Cluster, namespace string) ([]workload, e
}

func makeCronJobWorkload(cronJob *apibatch.CronJob) workload {
cronJob.APIVersion = "batch/v1beta1"
cronJob.Kind = "CronJob"
return workload{
apiVersion: "batch/v1beta1",
kind: "CronJob",
name: cronJob.ObjectMeta.Name,
status: cluster.StatusReady,
podTemplate: cronJob.Spec.JobTemplate.Spec.Template,
k8sObject: cronJob}
Expand Down Expand Up @@ -419,11 +414,10 @@ func makeFluxHelmReleaseWorkload(fluxHelmRelease *fhr_v1alpha2.FluxHelmRelease)
ImagePullSecrets: []apiv1.LocalObjectReference{},
},
}

// apiVersion & kind must be set, since TypeMeta is not populated
fluxHelmRelease.APIVersion = "helm.integrations.flux.weave.works/v1alpha2"
fluxHelmRelease.Kind = "FluxHelmRelease"
return workload{
apiVersion: "helm.integrations.flux.weave.works/v1alpha2",
kind: "FluxHelmRelease",
name: fluxHelmRelease.ObjectMeta.Name,
status: fluxHelmRelease.Status.ReleaseStatus,
podTemplate: podTemplate,
k8sObject: fluxHelmRelease,
Expand Down Expand Up @@ -482,11 +476,10 @@ func makeHelmReleaseWorkload(helmRelease *fhr_v1beta1.HelmRelease) workload {
ImagePullSecrets: []apiv1.LocalObjectReference{},
},
}

// apiVersion & kind must be set, since TypeMeta is not populated
helmRelease.APIVersion = "flux.weave.works/v1beta1"
helmRelease.Kind = "HelmRelease"
return workload{
apiVersion: "flux.weave.works/v1beta1",
kind: "HelmRelease",
name: helmRelease.ObjectMeta.Name,
status: helmRelease.Status.ReleaseStatus,
podTemplate: podTemplate,
k8sObject: helmRelease,
Expand Down
Loading