Skip to content

Commit

Permalink
[extension/storage/filestorage] Add CleanupOnStart flag for compactio…
Browse files Browse the repository at this point in the history
…n temporary files (#32863)

**Description:**
This PR includes a new flag **cleanup_on_start** for the compaction
section.
During compaction a copy of the database is created, when the process is
unexpectedly terminated that temporary file is not removed. That could
lead to disk exhaustion given the following scenario:
- Process is killed with a big database to be compacted
- Compaction is enabled on start
- Process will take longer to compact than the allotted time for the
collector to reply health checks (see: #32456)
- Process is killed while compacting
- Big temporary file left

This mitigates the potential risk of those temporary files left in a
short period of time, by this scenario or similar ones.

**Testing:**
Included corner case where two instances of the extensions are spawned
and one is compacting while the other would attempt to cleanup.

**Documentation:** 
Included description in the README of the new configuration flag

---------

Co-authored-by: Daniel Jaglowski <jaglows3@gmail.com>
  • Loading branch information
pandres-varian and djaglowski authored May 16, 2024
1 parent d66c0dc commit 4964cd8
Show file tree
Hide file tree
Showing 9 changed files with 113 additions and 2 deletions.
29 changes: 29 additions & 0 deletions .chloggen/extension-storage-filestorage-cleanup_on_start.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: extension/storage/filestorage

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: New flag cleanup_on_start for the compaction section (default=false).

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [32863]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
It will remove all temporary files in the compaction directory (those which start with `tempdb`),
temp files will be left if a previous run of the process is killed while compacting.
# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
4 changes: 4 additions & 0 deletions extension/storage/filestorage/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ The default timeout is `1s`.
`compaction.max_transaction_size` (default: 65536): defines maximum size of the compaction transaction.
A value of zero will ignore transaction sizes.

`compaction.cleanup_on_start` (default: false) - specifies if removal of compaction temporary files is performed on start.
It will remove all temporary files in the compaction directory (those which start with `tempdb`),
temp files will be left if a previous run of the process is killed while compacting.

### Rebound (online) compaction

For rebound compaction, there are two additional parameters available:
Expand Down
4 changes: 3 additions & 1 deletion extension/storage/filestorage/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
var defaultBucket = []byte(`default`)

const (
TempDbPrefix = "tempdb"

elapsedKey = "elapsed"
directoryKey = "directory"
tempDirectoryKey = "tempDirectory"
Expand Down Expand Up @@ -152,7 +154,7 @@ func (c *fileStorageClient) Compact(compactionDirectory string, timeout time.Dur
var compactedDb *bbolt.DB

// create temporary file in compactionDirectory
file, err = os.CreateTemp(compactionDirectory, "tempdb")
file, err = os.CreateTemp(compactionDirectory, TempDbPrefix)
if err != nil {
return err
}
Expand Down
4 changes: 4 additions & 0 deletions extension/storage/filestorage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ type CompactionConfig struct {
MaxTransactionSize int64 `mapstructure:"max_transaction_size,omitempty"`
// CheckInterval specifies frequency of compaction check
CheckInterval time.Duration `mapstructure:"check_interval,omitempty"`
// CleanupOnStart specifies removal of temporary files is performed on start.
// It will remove all the files in the compaction directory starting with tempdb,
// temp files will be left if a previous run of the process is killed while compacting.
CleanupOnStart bool `mapstructure:"cleanup_on_start,omitempty"`
}

func (cfg *Config) Validate() error {
Expand Down
1 change: 1 addition & 0 deletions extension/storage/filestorage/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func TestLoadConfig(t *testing.T) {
ReboundTriggerThresholdMiB: 16,
ReboundNeededThresholdMiB: 128,
CheckInterval: time.Second * 5,
CleanupOnStart: true,
},
Timeout: 2 * time.Second,
FSync: true,
Expand Down
34 changes: 33 additions & 1 deletion extension/storage/filestorage/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ package filestorage // import "github.com/open-telemetry/opentelemetry-collector

import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
"strings"

Expand Down Expand Up @@ -40,8 +42,11 @@ func newLocalFileStorage(logger *zap.Logger, config *Config) (extension.Extensio
}, nil
}

// Start does nothing
// Start runs cleanup if configured
func (lfs *localFileStorage) Start(context.Context, component.Host) error {
if lfs.cfg.Compaction.CleanupOnStart {
return lfs.cleanup(lfs.cfg.Compaction.Directory)
}
return nil
}

Expand Down Expand Up @@ -135,3 +140,30 @@ func isSafe(character rune) bool {
}
return false
}

// cleanup left compaction temporary files from previous killed process
func (lfs *localFileStorage) cleanup(compactionDirectory string) error {
pattern := filepath.Join(compactionDirectory, fmt.Sprintf("%s*", TempDbPrefix))
contents, err := filepath.Glob(pattern)
if err != nil {
lfs.logger.Info("cleanup error listing temporary files",
zap.Error(err))
return err
}

var errs []error
for _, item := range contents {
err = os.Remove(item)
if err == nil {
lfs.logger.Debug("cleanup",
zap.String("deletedFile", item))
} else {
errs = append(errs, err)
}
}
if errs != nil {
lfs.logger.Info("cleanup errors",
zap.Error(errors.Join(errs...)))
}
return nil
}
37 changes: 37 additions & 0 deletions extension/storage/filestorage/extension_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/extension/experimental/storage"
"go.opentelemetry.io/collector/extension/extensiontest"
)
Expand Down Expand Up @@ -448,3 +449,39 @@ func TestCompactionRemoveTemp(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 0, len(files))
}

func TestCleanupOnStart(t *testing.T) {
ctx := context.Background()

tempDir := t.TempDir()
// simulate left temporary compaction file from killed process
temp, _ := os.CreateTemp(tempDir, TempDbPrefix)
temp.Close()

f := NewFactory()
cfg := f.CreateDefaultConfig().(*Config)
cfg.Directory = tempDir
cfg.Compaction.Directory = tempDir
cfg.Compaction.CleanupOnStart = true
extension, err := f.CreateExtension(context.Background(), extensiontest.NewNopCreateSettings(), cfg)
require.NoError(t, err)

se, ok := extension.(storage.Extension)
require.True(t, ok)
require.NoError(t, se.Start(ctx, componenttest.NewNopHost()))

client, err := se.GetClient(
ctx,
component.KindReceiver,
newTestEntity("my_component"),
"",
)
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, client.Close(ctx))
})

files, err := os.ReadDir(tempDir)
require.NoError(t, err)
require.Equal(t, 1, len(files))
}
1 change: 1 addition & 0 deletions extension/storage/filestorage/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func createDefaultConfig() component.Config {
ReboundNeededThresholdMiB: defaultReboundNeededThresholdMib,
ReboundTriggerThresholdMiB: defaultReboundTriggerThresholdMib,
CheckInterval: defaultCompactionInterval,
CleanupOnStart: false,
},
Timeout: time.Second,
FSync: false,
Expand Down
1 change: 1 addition & 0 deletions extension/storage/filestorage/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ file_storage/all_settings:
rebound_trigger_threshold_mib: 16
rebound_needed_threshold_mib: 128
max_transaction_size: 2048
cleanup_on_start: true
timeout: 2s
fsync: true

0 comments on commit 4964cd8

Please sign in to comment.