-
Notifications
You must be signed in to change notification settings - Fork 2.5k
/
Copy pathextension.go
177 lines (159 loc) · 5.04 KB
/
extension.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package filestorage // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage/filestorage"
import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
"strings"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/extension"
"go.opentelemetry.io/collector/extension/xextension/storage"
"go.uber.org/zap"
)
type localFileStorage struct {
cfg *Config
logger *zap.Logger
}
// Ensure this storage extension implements the appropriate interface
var _ storage.Extension = (*localFileStorage)(nil)
func newLocalFileStorage(logger *zap.Logger, config *Config) (extension.Extension, error) {
if config.CreateDirectory {
var dirs []string
if config.Compaction.OnStart || config.Compaction.OnRebound {
dirs = []string{config.Directory, config.Compaction.Directory}
} else {
dirs = []string{config.Directory}
}
for _, dir := range dirs {
if err := ensureDirectoryExists(dir, os.FileMode(config.directoryPermissionsParsed)); err != nil {
return nil, err
}
}
}
return &localFileStorage{
cfg: config,
logger: logger,
}, nil
}
// Start runs cleanup if configured
func (lfs *localFileStorage) Start(context.Context, component.Host) error {
if lfs.cfg.Compaction.CleanupOnStart {
return lfs.cleanup(lfs.cfg.Compaction.Directory)
}
return nil
}
// Shutdown will close any open databases
func (lfs *localFileStorage) Shutdown(context.Context) error {
// TODO clean up data files that did not have a client
// and are older than a threshold (possibly configurable)
return nil
}
// GetClient returns a storage client for an individual component
func (lfs *localFileStorage) GetClient(_ context.Context, kind component.Kind, ent component.ID, name string) (storage.Client, error) {
var rawName string
if name == "" {
rawName = fmt.Sprintf("%s_%s_%s", kindString(kind), ent.Type(), ent.Name())
} else {
rawName = fmt.Sprintf("%s_%s_%s_%s", kindString(kind), ent.Type(), ent.Name(), name)
}
rawName = sanitize(rawName)
absoluteName := filepath.Join(lfs.cfg.Directory, rawName)
client, err := newClient(lfs.logger, absoluteName, lfs.cfg.Timeout, lfs.cfg.Compaction, !lfs.cfg.FSync)
if err != nil {
return nil, err
}
// return if compaction is not required
if lfs.cfg.Compaction.OnStart {
compactionErr := client.Compact(lfs.cfg.Compaction.Directory, lfs.cfg.Timeout, lfs.cfg.Compaction.MaxTransactionSize)
if compactionErr != nil {
lfs.logger.Error("compaction on start failed", zap.Error(compactionErr))
}
}
return client, nil
}
func kindString(k component.Kind) string {
switch k {
case component.KindReceiver:
return "receiver"
case component.KindProcessor:
return "processor"
case component.KindExporter:
return "exporter"
case component.KindExtension:
return "extension"
case component.KindConnector:
return "connector"
default:
return "other" // not expected
}
}
// sanitize replaces characters in name that are not safe in a file path
func sanitize(name string) string {
// Replace all unsafe characters with a tilde followed by the unsafe character's Unicode hex number.
// https://en.wikipedia.org/wiki/List_of_Unicode_characters
// For example, the slash is replaced with "~002F", and the tilde itself is replaced with "~007E".
// We perform replacement on the tilde even though it is a safe character to make sure that the sanitized component name
// never overlaps with a component name that does not require sanitization.
var sanitized strings.Builder
for _, character := range name {
if isSafe(character) {
sanitized.WriteString(string(character))
} else {
sanitized.WriteString(fmt.Sprintf("~%04X", character))
}
}
return sanitized.String()
}
func isSafe(character rune) bool {
// Safe characters are the following:
// - uppercase and lowercase letters A-Z, a-z
// - digits 0-9
// - dot `.`
// - hyphen `-`
// - underscore `_`
switch {
case character >= 'a' && character <= 'z',
character >= 'A' && character <= 'Z',
character >= '0' && character <= '9',
character == '.',
character == '-',
character == '_':
return true
}
return false
}
func ensureDirectoryExists(path string, perm os.FileMode) error {
if _, err := os.Stat(path); os.IsNotExist(err) {
return os.MkdirAll(path, perm)
}
// we already handled other errors in config.Validate(), so it's okay to return nil
return nil
}
// cleanup left compaction temporary files from previous killed process
func (lfs *localFileStorage) cleanup(compactionDirectory string) error {
pattern := filepath.Join(compactionDirectory, fmt.Sprintf("%s*", TempDbPrefix))
contents, err := filepath.Glob(pattern)
if err != nil {
lfs.logger.Info("cleanup error listing temporary files",
zap.Error(err))
return err
}
var errs []error
for _, item := range contents {
err = os.Remove(item)
if err == nil {
lfs.logger.Debug("cleanup",
zap.String("deletedFile", item))
} else {
errs = append(errs, err)
}
}
if errs != nil {
lfs.logger.Info("cleanup errors",
zap.Error(errors.Join(errs...)))
}
return nil
}