Skip to content

Commit

Permalink
Merge pull request #9 from mgtv-tech/feat_synclocal
Browse files Browse the repository at this point in the history
Feat synclocal
  • Loading branch information
daoshenzzg authored May 20, 2024
2 parents dab32d2 + e20766b commit f623c4d
Show file tree
Hide file tree
Showing 11 changed files with 636 additions and 95 deletions.
91 changes: 76 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ Translations: [English](README_en.md) | [简体中文](README.md)
- ✅ 支持开启分布式缓存异步刷新
- ✅ 指标采集,默认实现了通过日志打印各级缓存的统计指标(QPM、Hit、Miss、Query、QueryFail)
- ✅ 分布式缓存查询故障自动降级
-`MGet`接口支持`Load`函数。带分布缓存场景,采用`Pipeline`模式实现
-`MGet`接口支持`Load`函数。带分布缓存场景,采用`Pipeline`模式实现 (v1.1.0+)
- ✅ 支持拓展缓存更新后所有GO进程的本地缓存失效 (v1.1.1+)

# 安装
使用最新版本的jetcache-go,您可以在项目中导入该库:
Expand All @@ -37,6 +38,7 @@ package cache_test
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"time"
Expand Down Expand Up @@ -162,25 +164,84 @@ func Example_mGetUsage() {

cacheT.Close()
}

func Example_syncLocalUsage() {
ring := redis.NewRing(&redis.RingOptions{
Addrs: map[string]string{
"localhost": ":6379",
},
})

sourceID := "12345678" // Unique identifier for this cache instance
channelName := "syncLocalChannel"
pubSub := ring.Subscribe(context.Background(), channelName)

mycache := cache.New(cache.WithName("any"),
cache.WithRemote(remote.NewGoRedisV8Adaptor(ring)),
cache.WithLocal(local.NewFreeCache(256*local.MB, time.Minute)),
cache.WithErrNotFound(errRecordNotFound),
cache.WithRemoteExpiry(time.Minute),
cache.WithSourceId(sourceID),
cache.WithSyncLocal(true),
cache.WithEventHandler(func(event *cache.Event) {
// Broadcast local cache invalidation for the received keys
bs, _ := json.Marshal(event)
ring.Publish(context.Background(), channelName, string(bs))
}),
)
obj, _ := mockDBGetObject(1)
if err := mycache.Set(context.TODO(), "mykey", cache.Value(obj), cache.TTL(time.Hour)); err != nil {
panic(err)
}

go func() {
for {
msg, err := pubSub.ReceiveMessage(context.Background())
if err != nil {
panic(err)
}
var event *cache.Event
if err = json.Unmarshal([]byte(msg.Payload), &event); err != nil {
panic(err)
}
fmt.Println(event.Keys)

// Invalidate local cache for received keys (except own events)
if event.SourceID != sourceID {
for _, key := range event.Keys {
mycache.DeleteFromLocalCache(key)
}
}
}
}()

// Output: [mykey]
mycache.Close()
time.Sleep(time.Second)
}
```

### 配置选项
```go
// Options are used to store cache options.
type Options struct {
name string // Cache name, used for log identification and metric reporting
remote remote.Remote // Remote is distributed cache, such as Redis.
local local.Local // Local is memory cache, such as FreeCache.
codec string // Value encoding and decoding method. Default is "msgpack.Name". You can also customize it.
errNotFound error // Error to return for cache miss. Used to prevent cache penetration.
remoteExpiry time.Duration // Remote cache ttl, Default is 1 hour.
notFoundExpiry time.Duration // Duration for placeholder cache when there is a cache miss. Default is 1 minute.
offset time.Duration // Expiration time jitter factor for cache misses.
refreshDuration time.Duration // Interval for asynchronous cache refresh. Default is 0 (refresh is disabled).
stopRefreshAfterLastAccess time.Duration // Duration for cache to stop refreshing after no access. Default is refreshDuration + 1 second.
refreshConcurrency int // Maximum number of concurrent cache refreshes. Default is 4.
statsDisabled bool // Flag to disable cache statistics.
statsHandler stats.Handler // Metrics statsHandler collector.
Options struct {
name string // Cache name, used for log identification and metric reporting
remote remote.Remote // Remote is distributed cache, such as Redis.
local local.Local // Local is memory cache, such as FreeCache.
codec string // Value encoding and decoding method. Default is "msgpack.Name". You can also customize it.
errNotFound error // Error to return for cache miss. Used to prevent cache penetration.
remoteExpiry time.Duration // Remote cache ttl, Default is 1 hour.
notFoundExpiry time.Duration // Duration for placeholder cache when there is a cache miss. Default is 1 minute.
offset time.Duration // Expiration time jitter factor for cache misses.
refreshDuration time.Duration // Interval for asynchronous cache refresh. Default is 0 (refresh is disabled).
stopRefreshAfterLastAccess time.Duration // Duration for cache to stop refreshing after no access. Default is refreshDuration + 1 second.
refreshConcurrency int // Maximum number of concurrent cache refreshes. Default is 4.
statsDisabled bool // Flag to disable cache statistics.
statsHandler stats.Handler // Metrics statsHandler collector.
sourceID string // Unique identifier for cache instance.
syncLocal bool // Enable events for syncing local cache (only for "Both" cache type).
eventChBufSize int // Buffer size for event channel (default: 100).
eventHandler func(event *Event) // Function to handle local cache invalidation events.
}
```

Expand Down
91 changes: 76 additions & 15 deletions README_en.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ Translate to: [简体中文](README.md)
- ✅ Supports asynchronous refreshing of distributed caches.
- ✅ Metrics collection: By default, it prints statistical metrics (QPM, Hit, Miss, Query, QueryFail) through logs.
- ✅ Automatic degradation of distributed cache query failures.
- ✅ The `MGet` interface supports the `Load` function. In a distributed caching scenario, the Pipeline mode is used to improve performance.
- ✅ The `MGet` interface supports the `Load` function. In a distributed caching scenario, the Pipeline mode is used to improve performance. (v1.1.0+)
- ✅ Invalidate local caches (in all Go processes) after updates (v1.1.1+)

# Installation
To start using the latest version of jetcache-go, you can import the library into your project:
Expand All @@ -38,6 +39,7 @@ package cache_test
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"time"
Expand Down Expand Up @@ -163,25 +165,84 @@ func Example_mGetUsage() {

cacheT.Close()
}

func Example_syncLocalUsage() {
ring := redis.NewRing(&redis.RingOptions{
Addrs: map[string]string{
"localhost": ":6379",
},
})

sourceID := "12345678" // Unique identifier for this cache instance
channelName := "syncLocalChannel"
pubSub := ring.Subscribe(context.Background(), channelName)

mycache := cache.New(cache.WithName("any"),
cache.WithRemote(remote.NewGoRedisV8Adaptor(ring)),
cache.WithLocal(local.NewFreeCache(256*local.MB, time.Minute)),
cache.WithErrNotFound(errRecordNotFound),
cache.WithRemoteExpiry(time.Minute),
cache.WithSourceId(sourceID),
cache.WithSyncLocal(true),
cache.WithEventHandler(func(event *cache.Event) {
// Broadcast local cache invalidation for the received keys
bs, _ := json.Marshal(event)
ring.Publish(context.Background(), channelName, string(bs))
}),
)
obj, _ := mockDBGetObject(1)
if err := mycache.Set(context.TODO(), "mykey", cache.Value(obj), cache.TTL(time.Hour)); err != nil {
panic(err)
}

go func() {
for {
msg, err := pubSub.ReceiveMessage(context.Background())
if err != nil {
panic(err)
}
var event *cache.Event
if err = json.Unmarshal([]byte(msg.Payload), &event); err != nil {
panic(err)
}
fmt.Println(event.Keys)

// Invalidate local cache for received keys (except own events)
if event.SourceID != sourceID {
for _, key := range event.Keys {
mycache.DeleteFromLocalCache(key)
}
}
}
}()

// Output: [mykey]
mycache.Close()
time.Sleep(time.Second)
}
```

### Configure settings
```go
// Options are used to store cache options.
type Options struct {
name string // Cache name, used for log identification and metric reporting
remote remote.Remote // Remote is distributed cache, such as Redis.
local local.Local // Local is memory cache, such as FreeCache.
codec string // Value encoding and decoding method. Default is "msgpack.Name". You can also customize it.
errNotFound error // Error to return for cache miss. Used to prevent cache penetration.
remoteExpiry time.Duration // Remote cache ttl, Default is 1 hour.
notFoundExpiry time.Duration // Duration for placeholder cache when there is a cache miss. Default is 1 minute.
offset time.Duration // Expiration time jitter factor for cache misses.
refreshDuration time.Duration // Interval for asynchronous cache refresh. Default is 0 (refresh is disabled).
stopRefreshAfterLastAccess time.Duration // Duration for cache to stop refreshing after no access. Default is refreshDuration + 1 second.
refreshConcurrency int // Maximum number of concurrent cache refreshes. Default is 4.
statsDisabled bool // Flag to disable cache statistics.
statsHandler stats.Handler // Metrics statsHandler collector.
Options struct {
name string // Cache name, used for log identification and metric reporting
remote remote.Remote // Remote is distributed cache, such as Redis.
local local.Local // Local is memory cache, such as FreeCache.
codec string // Value encoding and decoding method. Default is "msgpack.Name". You can also customize it.
errNotFound error // Error to return for cache miss. Used to prevent cache penetration.
remoteExpiry time.Duration // Remote cache ttl, Default is 1 hour.
notFoundExpiry time.Duration // Duration for placeholder cache when there is a cache miss. Default is 1 minute.
offset time.Duration // Expiration time jitter factor for cache misses.
refreshDuration time.Duration // Interval for asynchronous cache refresh. Default is 0 (refresh is disabled).
stopRefreshAfterLastAccess time.Duration // Duration for cache to stop refreshing after no access. Default is refreshDuration + 1 second.
refreshConcurrency int // Maximum number of concurrent cache refreshes. Default is 4.
statsDisabled bool // Flag to disable cache statistics.
statsHandler stats.Handler // Metrics statsHandler collector.
sourceID string // Unique identifier for cache instance.
syncLocal bool // Enable events for syncing local cache (only for "Both" cache type).
eventChBufSize int // Buffer size for event channel (default: 100).
eventHandler func(event *Event) // Function to handle local cache invalidation events.
}
```

Expand Down
Loading

0 comments on commit f623c4d

Please sign in to comment.