Skip to content

Commit

Permalink
feat: implement data retention feature
Browse files Browse the repository at this point in the history
- Add the retention configurations to toml file
- If the Retention.Enabled set to true, core data will create a schedule to trigger the reading purging. When the reading amount reaches the MaxCap, core data should purge the amount of the reading to the minimum capacity.

Close #4618

Signed-off-by: bruce <weichou1229@gmail.com>
  • Loading branch information
weichou1229 committed Sep 8, 2023
1 parent 541cc85 commit 2b2ffc9
Show file tree
Hide file tree
Showing 9 changed files with 307 additions and 36 deletions.
6 changes: 6 additions & 0 deletions cmd/core-data/res/configuration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,9 @@ MessageBus:

Database:
Name: "coredata"

Retention:
Enabled: false
Interval: 30s # Purging interval defines when the database should be rid of readings above the high watermark.
MaxCap: 10000 # The maximum capacity defines where the high watermark of readings should be detected for purging the amount of the reading to the minimum capacity.
MinCap: 8000 # The minimum capacity defines where the total count of readings should be returned to during purging.
60 changes: 58 additions & 2 deletions internal/core/data/application/reading.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,27 @@
//
// Copyright (C) 2021 IOTech Ltd
// Copyright (C) 2021-2023 IOTech Ltd
//
// SPDX-License-Identifier: Apache-2.0

package application

import (
"github.com/edgexfoundry/go-mod-bootstrap/v3/di"
"context"
"fmt"
"sync"
"time"

bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v3/bootstrap/container"
"github.com/edgexfoundry/go-mod-bootstrap/v3/di"
"github.com/edgexfoundry/go-mod-core-contracts/v3/dtos"
"github.com/edgexfoundry/go-mod-core-contracts/v3/errors"
"github.com/edgexfoundry/go-mod-core-contracts/v3/models"

"github.com/edgexfoundry/edgex-go/internal/core/data/container"
)

var asyncPurgeReadingOnce sync.Once

// ReadingTotalCount return the count of all of readings currently stored in the database and error if any
func ReadingTotalCount(dic *di.Container) (uint32, errors.EdgeX) {
dbClient := container.DBClientFrom(dic.Get)
Expand Down Expand Up @@ -216,3 +223,52 @@ func ReadingsByDeviceNameAndResourceNamesAndTimeRange(deviceName string, resourc
}
return readings, totalCount, nil
}

// AsyncPurgeReading purge readings and related events according to the retention capability.
func AsyncPurgeReading(interval time.Duration, ctx context.Context, dic *di.Container) {
asyncPurgeReadingOnce.Do(func() {
go func() {
lc := bootstrapContainer.LoggingClientFrom(dic.Get)
timer := time.NewTimer(interval)
for {
timer.Reset(interval) // since event deletion might take lots of time, restart the timer to recount the time
select {
case <-ctx.Done():
lc.Info("Exiting reading retention")
return
case <-timer.C:
err := purgeReading(dic)
if err != nil {
lc.Errorf("Failed to purge events and readings, %v", err)
break
}
}
}
}()
})
}

func purgeReading(dic *di.Container) errors.EdgeX {
lc := bootstrapContainer.LoggingClientFrom(dic.Get)
dbClient := container.DBClientFrom(dic.Get)
config := container.ConfigurationFrom(dic.Get)
total, err := dbClient.ReadingTotalCount()
if err != nil {
return errors.NewCommonEdgeX(errors.Kind(err), "failed to query reading total count, %v", err)
}
if total >= config.Retention.MaxCap {
lc.Debugf("Purging the reading amount %d to the minimum capacity %d", total, config.Retention.MinCap)
// Using reading origin instead event origin to remove events and readings by age.
// If we remove readings to MinCap and remove related events, some readings might lose the related event.
reading, err := dbClient.LatestReadingByOffset(config.Retention.MinCap)
if err != nil {
return errors.NewCommonEdgeX(errors.Kind(err), fmt.Sprintf("failed to query reading with offset '%d'", config.Retention.MinCap), err)
}
age := time.Now().UnixNano() - reading.GetBaseReading().Origin
err = dbClient.DeleteEventsByAge(age)
if err != nil {
return errors.NewCommonEdgeX(errors.Kind(err), fmt.Sprintf("failed to delete events and readings by age '%d'", age), err)
}
}
return nil
}
57 changes: 55 additions & 2 deletions internal/core/data/application/reading_test.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,25 @@
//
// Copyright (C) 2023 IOTech Ltd
//
// SPDX-License-Identifier: Apache-2.0

package application

import (
"net/http"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/edgexfoundry/edgex-go/internal/core/data/config"
"github.com/edgexfoundry/edgex-go/internal/core/data/container"
dbMock "github.com/edgexfoundry/edgex-go/internal/core/data/infrastructure/interfaces/mocks"
"github.com/edgexfoundry/edgex-go/internal/core/data/mocks"
"github.com/edgexfoundry/go-mod-bootstrap/v3/di"
"github.com/edgexfoundry/go-mod-core-contracts/v3/errors"
"github.com/edgexfoundry/go-mod-core-contracts/v3/models"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestAllReadings(t *testing.T) {
Expand Down Expand Up @@ -242,3 +250,48 @@ func TestReadingCountByDeviceName(t *testing.T) {
})
}
}

func TestPurgeReading(t *testing.T) {
dic := mocks.NewMockDIC()
coreDataConfig := container.ConfigurationFrom(dic.Get)
coreDataConfig.Retention = config.ReadingRetention{
Enabled: true,
Interval: "1s",
MaxCap: 5,
MinCap: 3,
}
dic.Update(di.ServiceConstructorMap{
container.ConfigurationName: func(get di.Get) interface{} {
return coreDataConfig
},
})

tests := []struct {
name string
readingCount uint32
}{
{"invoke reading purging", coreDataConfig.Retention.MaxCap},
{"not invoke reading purging", coreDataConfig.Retention.MinCap},
}
for _, testCase := range tests {
t.Run(testCase.name, func(t *testing.T) {
dbClientMock := &dbMock.DBClient{}
var reading models.Reading = models.SimpleReading{}
dbClientMock.On("LatestReadingByOffset", coreDataConfig.Retention.MinCap).Return(reading, nil)
dbClientMock.On("ReadingTotalCount").Return(testCase.readingCount, nil)
dbClientMock.On("DeleteEventsByAge", mock.Anything).Return(nil)
dic.Update(di.ServiceConstructorMap{
container.DBClientInterfaceName: func(get di.Get) interface{} {
return dbClientMock
},
})
err := purgeReading(dic)
require.NoError(t, err)
if testCase.readingCount >= coreDataConfig.Retention.MaxCap {
dbClientMock.AssertCalled(t, "DeleteEventsByAge", mock.Anything)
} else {
dbClientMock.AssertNotCalled(t, "DeleteEventsByAge", mock.Anything)
}
})
}
}
8 changes: 8 additions & 0 deletions internal/core/data/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type ConfigurationStruct struct {
Registry bootstrapConfig.RegistryInfo
Service bootstrapConfig.ServiceInfo
MaxEventSize int64
Retention ReadingRetention
}

type WritableInfo struct {
Expand All @@ -35,6 +36,13 @@ type WritableInfo struct {
Telemetry bootstrapConfig.TelemetryInfo
}

type ReadingRetention struct {
Enabled bool
Interval string
MaxCap uint32
MinCap uint32
}

// UpdateFromRaw converts configuration received from the registry to a service-specific configuration struct which is
// then used to overwrite the service's existing configuration struct.
func (c *ConfigurationStruct) UpdateFromRaw(rawConfig interface{}) bool {
Expand Down
3 changes: 2 additions & 1 deletion internal/core/data/infrastructure/interfaces/db.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright (C) 2020-2021 IOTech Ltd
// Copyright (C) 2020-2023 IOTech Ltd
//
// SPDX-License-Identifier: Apache-2.0

Expand Down Expand Up @@ -41,4 +41,5 @@ type DBClient interface {
ReadingsByDeviceNameAndResourceNamesAndTimeRange(deviceName string, resourceNames []string, start, end, offset, limit int) ([]model.Reading, uint32, errors.EdgeX)
ReadingsByDeviceNameAndTimeRange(deviceName string, start int, end int, offset int, limit int) ([]model.Reading, errors.EdgeX)
ReadingCountByDeviceNameAndTimeRange(deviceName string, start int, end int) (uint32, errors.EdgeX)
LatestReadingByOffset(offset uint32) (model.Reading, errors.EdgeX)
}
Loading

0 comments on commit 2b2ffc9

Please sign in to comment.