Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

should be able to create a redis ring from universal options and universal ctor #2727

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
c4706aa
should be able to create a redis ring from universal options and univ…
peczenyj Sep 21, 2023
788fbd3
add explanation for fields in ring
peczenyj Sep 21, 2023
1244908
fix doc
peczenyj Sep 21, 2023
6ec2570
Merge branch 'master' into should-be-able-to-create-a-ring-from-unive…
peczenyj Oct 25, 2023
c4c39b2
Merge branch 'master' into should-be-able-to-create-a-ring-from-unive…
peczenyj Oct 30, 2023
f160b18
fix merge
peczenyj Oct 30, 2023
82cdd9d
Merge branch 'master' into should-be-able-to-create-a-ring-from-unive…
peczenyj Oct 31, 2023
04a09a0
Merge branch 'master' into should-be-able-to-create-a-ring-from-unive…
peczenyj Dec 20, 2023
302ebb0
Merge branch 'master' into should-be-able-to-create-a-ring-from-unive…
peczenyj Mar 12, 2024
d8c18ca
Merge branch 'master' into should-be-able-to-create-a-ring-from-unive…
peczenyj Apr 11, 2024
f5496d1
Fix for issues #2959 and #2960 (#2961)
singular-seal Apr 12, 2024
6960bcc
chore: fix some comments (#2967)
kindknow Apr 13, 2024
f7f34fe
Optimize docs useless imports and typo (#2970)
hakusai22 Apr 14, 2024
9f1522a
Fix typo in comment (#2972)
deferdeter Apr 16, 2024
90c7a41
Adding BitfieldRo in BitMapCmdable interface (#2962)
akash14darshan Apr 21, 2024
fa9edec
Fix XGroup first pos key (#2983)
daviddzxy Apr 27, 2024
b64d9de
Handle IPv6 in isMovedError (#2981)
daviddzxy Apr 28, 2024
0f0a284
Remove skipping span creation by checking parent spans (#2980)
XSAM May 10, 2024
2d8fa02
fix: fix #2681 (#2998)
monkey92t May 29, 2024
8617e3c
should be able to create a redis ring from universal options and univ…
peczenyj Sep 21, 2023
b007cc0
add explanation for fields in ring
peczenyj Sep 21, 2023
ce24ac9
fix doc
peczenyj Sep 21, 2023
e594ee1
Merge remote-tracking branch 'refs/remotes/origin/should-be-able-to-c…
peczenyj Jun 2, 2024
567979a
skip ring test
peczenyj Jun 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
## Unreleased

### Changed

* `go-redis` won't skip span creation if the parent spans is not recording. ([#2980](/~https://github.com/redis/go-redis/issues/2980))
Users can use the OpenTelemetry sampler to control the sampling behavior.
For instance, you can use the `ParentBased(NeverSample())` sampler from `go.opentelemetry.io/otel/sdk/trace` to keep
a similar behavior (drop orphan spans) of `go-redis` as before.

## [9.0.5](/~https://github.com/redis/go-redis/compare/v9.0.4...v9.0.5) (2023-05-29)


Expand Down
3 changes: 0 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,6 @@ to this specification.

```go
import (
"context"
"fmt"

"github.com/redis/go-redis/v9"
)

Expand Down
1 change: 1 addition & 0 deletions bitmap_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type BitMapCmdable interface {
BitPos(ctx context.Context, key string, bit int64, pos ...int64) *IntCmd
BitPosSpan(ctx context.Context, key string, bit int8, start, end int64, span string) *IntCmd
BitField(ctx context.Context, key string, values ...interface{}) *IntSliceCmd
BitFieldRO(ctx context.Context, key string, values ...interface{}) *IntSliceCmd
}

func (c cmdable) GetBit(ctx context.Context, key string, offset int64) *IntCmd {
Expand Down
3 changes: 3 additions & 0 deletions error.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net"
"strings"

"github.com/redis/go-redis/v9/internal"
"github.com/redis/go-redis/v9/internal/pool"
"github.com/redis/go-redis/v9/internal/proto"
)
Expand Down Expand Up @@ -129,7 +130,9 @@ func isMovedError(err error) (moved bool, ask bool, addr string) {
if ind == -1 {
return false, false, ""
}

addr = s[ind+1:]
addr = internal.GetAddr(addr)
return
}

Expand Down
13 changes: 12 additions & 1 deletion example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ func ExampleClient_Watch() {
return err
}

// Actual opperation (local in optimistic lock).
// Actual operation (local in optimistic lock).
n++

// Operation is committed only if the watched keys remain unchanged.
Expand Down Expand Up @@ -678,6 +678,17 @@ func ExampleNewUniversalClient_cluster() {
rdb.Ping(ctx)
}

func ExampleNewUniversalClient_ring() {
rdb := redis.NewUniversalClient(&redis.UniversalOptions{
AddressMap: map[string]string{
"shard1": ":7000",
"shard2": ":7001",
"shard3": ":7002",
},
})
rdb.Ping(ctx)
}

func ExampleClient_SlowLogGet() {
if RECluster {
// skip slowlog test for cluster
Expand Down
12 changes: 0 additions & 12 deletions extra/redisotel/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,6 @@ func newTracingHook(connString string, opts ...TracingOption) *tracingHook {

func (th *tracingHook) DialHook(hook redis.DialHook) redis.DialHook {
return func(ctx context.Context, network, addr string) (net.Conn, error) {
if !trace.SpanFromContext(ctx).IsRecording() {
return hook(ctx, network, addr)
}

ctx, span := th.conf.tracer.Start(ctx, "redis.dial", th.spanOpts...)
defer span.End()

Expand All @@ -109,10 +105,6 @@ func (th *tracingHook) DialHook(hook redis.DialHook) redis.DialHook {

func (th *tracingHook) ProcessHook(hook redis.ProcessHook) redis.ProcessHook {
return func(ctx context.Context, cmd redis.Cmder) error {
if !trace.SpanFromContext(ctx).IsRecording() {
return hook(ctx, cmd)
}

fn, file, line := funcFileLine("github.com/redis/go-redis")

attrs := make([]attribute.KeyValue, 0, 8)
Expand Down Expand Up @@ -145,10 +137,6 @@ func (th *tracingHook) ProcessPipelineHook(
hook redis.ProcessPipelineHook,
) redis.ProcessPipelineHook {
return func(ctx context.Context, cmds []redis.Cmder) error {
if !trace.SpanFromContext(ctx).IsRecording() {
return hook(ctx, cmds)
}

fn, file, line := funcFileLine("github.com/redis/go-redis")

attrs := make([]attribute.KeyValue, 0, 8)
Expand Down
17 changes: 17 additions & 0 deletions internal/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package internal

import (
"context"
"net"
"strings"
"time"

Expand Down Expand Up @@ -64,3 +65,19 @@ func ReplaceSpaces(s string) string {

return builder.String()
}

func GetAddr(addr string) string {
ind := strings.LastIndexByte(addr, ':')
if ind == -1 {
return ""
}

if strings.IndexByte(addr, '.') != -1 {
return addr
}

if addr[0] == '[' {
return addr
}
return net.JoinHostPort(addr[:ind], addr[ind+1:])
}
21 changes: 21 additions & 0 deletions internal/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,24 @@ func TestIsLower(t *testing.T) {
Expect(isLower(str)).To(BeTrue())
})
}

func TestGetAddr(t *testing.T) {
It("getAddr", func() {
str := "127.0.0.1:1234"
Expect(GetAddr(str)).To(Equal(str))

str = "[::1]:1234"
Expect(GetAddr(str)).To(Equal(str))

str = "[fd01:abcd::7d03]:6379"
Expect(GetAddr(str)).To(Equal(str))

Expect(GetAddr("::1:1234")).To(Equal("[::1]:1234"))

Expect(GetAddr("fd01:abcd::7d03:6379")).To(Equal("[fd01:abcd::7d03]:6379"))

Expect(GetAddr("127.0.0.1")).To(Equal(""))

Expect(GetAddr("127")).To(Equal(""))
})
}
10 changes: 8 additions & 2 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ type Options struct {
// before reconnecting. It should return the current username and password.
CredentialsProvider func() (username string, password string)

// CredentialsProviderContext is an enhanced parameter of CredentialsProvider,
// done to maintain API compatibility. In the future,
// there might be a merge between CredentialsProviderContext and CredentialsProvider.
// There will be a conflict between them; if CredentialsProviderContext exists, we will ignore CredentialsProvider.
CredentialsProviderContext func(ctx context.Context) (username string, password string, err error)

// Database to be selected after connecting to the server.
DB int

Expand Down Expand Up @@ -250,12 +256,12 @@ func NewDialer(opt *Options) func(context.Context, string, string) (net.Conn, er
// - field names are mapped using snake-case conversion: to set MaxRetries, use max_retries
// - only scalar type fields are supported (bool, int, time.Duration)
// - for time.Duration fields, values must be a valid input for time.ParseDuration();
// additionally a plain integer as value (i.e. without unit) is intepreted as seconds
// additionally a plain integer as value (i.e. without unit) is interpreted as seconds
// - to disable a duration field, use value less than or equal to 0; to use the default
// value, leave the value blank or remove the parameter
// - only the last value is interpreted if a parameter is given multiple times
// - fields "network", "addr", "username" and "password" can only be set using other
// URL attributes (scheme, host, userinfo, resp.), query paremeters using these
// URL attributes (scheme, host, userinfo, resp.), query parameters using these
// names will be treated as unknown parameters
// - unknown parameter names will result in an error
//
Expand Down
28 changes: 17 additions & 11 deletions osscluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,11 @@ type ClusterOptions struct {

OnConnect func(ctx context.Context, cn *Conn) error

Protocol int
Username string
Password string
CredentialsProvider func() (username string, password string)
Protocol int
Username string
Password string
CredentialsProvider func() (username string, password string)
CredentialsProviderContext func(ctx context.Context) (username string, password string, err error)

MaxRetries int
MinRetryBackoff time.Duration
Expand Down Expand Up @@ -157,12 +158,12 @@ func (opt *ClusterOptions) init() {
// - field names are mapped using snake-case conversion: to set MaxRetries, use max_retries
// - only scalar type fields are supported (bool, int, time.Duration)
// - for time.Duration fields, values must be a valid input for time.ParseDuration();
// additionally a plain integer as value (i.e. without unit) is intepreted as seconds
// additionally a plain integer as value (i.e. without unit) is interpreted as seconds
// - to disable a duration field, use value less than or equal to 0; to use the default
// value, leave the value blank or remove the parameter
// - only the last value is interpreted if a parameter is given multiple times
// - fields "network", "addr", "username" and "password" can only be set using other
// URL attributes (scheme, host, userinfo, resp.), query paremeters using these
// URL attributes (scheme, host, userinfo, resp.), query parameters using these
// names will be treated as unknown parameters
// - unknown parameter names will result in an error
//
Expand Down Expand Up @@ -272,10 +273,11 @@ func (opt *ClusterOptions) clientOptions() *Options {
Dialer: opt.Dialer,
OnConnect: opt.OnConnect,

Protocol: opt.Protocol,
Username: opt.Username,
Password: opt.Password,
CredentialsProvider: opt.CredentialsProvider,
Protocol: opt.Protocol,
Username: opt.Username,
Password: opt.Password,
CredentialsProvider: opt.CredentialsProvider,
CredentialsProviderContext: opt.CredentialsProviderContext,

MaxRetries: opt.MaxRetries,
MinRetryBackoff: opt.MinRetryBackoff,
Expand Down Expand Up @@ -1295,6 +1297,7 @@ func (c *ClusterClient) processPipelineNode(
_ = node.Client.withProcessPipelineHook(ctx, cmds, func(ctx context.Context, cmds []Cmder) error {
cn, err := node.Client.getConn(ctx)
if err != nil {
node.MarkAsFailing()
_ = c.mapCmdsByNode(ctx, failedCmds, cmds)
setCmdsErr(cmds, err)
return err
Expand All @@ -1316,6 +1319,9 @@ func (c *ClusterClient) processPipelineNodeConn(
if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error {
return writeCmds(wr, cmds)
}); err != nil {
if isBadConn(err, false, node.Client.getAddr()) {
node.MarkAsFailing()
}
if shouldRetry(err, true) {
_ = c.mapCmdsByNode(ctx, failedCmds, cmds)
}
Expand Down Expand Up @@ -1347,7 +1353,7 @@ func (c *ClusterClient) pipelineReadCmds(
continue
}

if c.opt.ReadOnly {
if c.opt.ReadOnly && isBadConn(err, false, node.Client.getAddr()) {
node.MarkAsFailing()
}

Expand Down
2 changes: 1 addition & 1 deletion pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ func (c *PubSub) getContext() context.Context {
// Receive* APIs can not be used after channel is created.
//
// go-redis periodically sends ping messages to test connection health
// and re-subscribes if ping can not not received for 1 minute.
// and re-subscribes if ping can not received for 1 minute.
func (c *PubSub) Channel(opts ...ChannelOption) <-chan *Message {
c.chOnce.Do(func() {
c.msgCh = newChannel(c, opts...)
Expand Down
11 changes: 8 additions & 3 deletions redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,13 @@ func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error {
}
cn.Inited = true

var err error
username, password := c.opt.Username, c.opt.Password
if c.opt.CredentialsProvider != nil {
if c.opt.CredentialsProviderContext != nil {
if username, password, err = c.opt.CredentialsProviderContext(ctx); err != nil {
return err
}
} else if c.opt.CredentialsProvider != nil {
username, password = c.opt.CredentialsProvider()
}

Expand All @@ -300,7 +305,7 @@ func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error {

// for redis-server versions that do not support the HELLO command,
// RESP2 will continue to be used.
if err := conn.Hello(ctx, protocol, username, password, "").Err(); err == nil {
if err = conn.Hello(ctx, protocol, username, password, "").Err(); err == nil {
auth = true
} else if !isRedisError(err) {
// When the server responds with the RESP protocol and the result is not a normal
Expand All @@ -313,7 +318,7 @@ func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error {
return err
}

_, err := conn.Pipelined(ctx, func(pipe Pipeliner) error {
_, err = conn.Pipelined(ctx, func(pipe Pipeliner) error {
if !auth && password != "" {
if username != "" {
pipe.AuthACL(ctx, username, password)
Expand Down
4 changes: 2 additions & 2 deletions redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ var _ = Describe("Client OnConnect", func() {
})
})

var _ = Describe("Client context cancelation", func() {
var _ = Describe("Client context cancellation", func() {
var opt *redis.Options
var client *redis.Client

Expand All @@ -484,7 +484,7 @@ var _ = Describe("Client context cancelation", func() {
Expect(client.Close()).NotTo(HaveOccurred())
})

It("Blocking operation cancelation", func() {
It("Blocking operation cancellation", func() {
ctx, cancel := context.WithCancel(ctx)
cancel()

Expand Down
6 changes: 6 additions & 0 deletions stream_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,36 +178,42 @@ func (c cmdable) XReadStreams(ctx context.Context, streams ...string) *XStreamSl

func (c cmdable) XGroupCreate(ctx context.Context, stream, group, start string) *StatusCmd {
cmd := NewStatusCmd(ctx, "xgroup", "create", stream, group, start)
cmd.SetFirstKeyPos(2)
_ = c(ctx, cmd)
return cmd
}

func (c cmdable) XGroupCreateMkStream(ctx context.Context, stream, group, start string) *StatusCmd {
cmd := NewStatusCmd(ctx, "xgroup", "create", stream, group, start, "mkstream")
cmd.SetFirstKeyPos(2)
_ = c(ctx, cmd)
return cmd
}

func (c cmdable) XGroupSetID(ctx context.Context, stream, group, start string) *StatusCmd {
cmd := NewStatusCmd(ctx, "xgroup", "setid", stream, group, start)
cmd.SetFirstKeyPos(2)
_ = c(ctx, cmd)
return cmd
}

func (c cmdable) XGroupDestroy(ctx context.Context, stream, group string) *IntCmd {
cmd := NewIntCmd(ctx, "xgroup", "destroy", stream, group)
cmd.SetFirstKeyPos(2)
_ = c(ctx, cmd)
return cmd
}

func (c cmdable) XGroupCreateConsumer(ctx context.Context, stream, group, consumer string) *IntCmd {
cmd := NewIntCmd(ctx, "xgroup", "createconsumer", stream, group, consumer)
cmd.SetFirstKeyPos(2)
_ = c(ctx, cmd)
return cmd
}

func (c cmdable) XGroupDelConsumer(ctx context.Context, stream, group, consumer string) *IntCmd {
cmd := NewIntCmd(ctx, "xgroup", "delconsumer", stream, group, consumer)
cmd.SetFirstKeyPos(2)
_ = c(ctx, cmd)
return cmd
}
Expand Down
Loading
Loading