Skip to content

Commit

Permalink
Merge pull request #176 from art2ip/juno_24_0614
Browse files Browse the repository at this point in the history
Enhancements in go client library.
  • Loading branch information
NeetishPathak authored Jun 20, 2024
2 parents 60672b5 + 3d90c89 commit 4f99105
Show file tree
Hide file tree
Showing 25 changed files with 1,287 additions and 347 deletions.
35 changes: 13 additions & 22 deletions cmd/dbscanserv/prime/replicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ package prime

import (
"errors"
"runtime"
"time"

"github.com/paypal/junodb/third_party/forked/golang/glog"
Expand All @@ -34,7 +33,7 @@ import (

var (
secConfig *sec.Config
processor []*cli.Processor
processor *cli.Processor
inChan = make(chan *proto.OperationalMessage, 1000)
outChan = make(chan bool, 1000)
)
Expand All @@ -57,39 +56,31 @@ func InitReplicator(proxyAddr string, numConns int) {
}

if numConns <= 0 {
numConns = 1
numConns = 2
}
if numConns > 4 {
numConns = 4
}

processor = make([]*cli.Processor, numConns)
for i := 0; i < numConns; i++ {
processor = cli.NewProcessor(
io.ServiceEndpoint{Addr: proxyAddr, SSLEnabled: secConfig != nil},
"dbscan",
numConns, // connPoolSize
time.Duration(500*time.Millisecond), // ConnectTimeout
time.Duration(1000*time.Millisecond), // ResponseTimeout
nil) // GetTLSConfig

processor[i] = cli.NewProcessor(
io.ServiceEndpoint{Addr: proxyAddr, SSLEnabled: secConfig != nil},
"dbscan",
time.Duration(500*time.Millisecond), // ConnectTimeout
time.Duration(1000*time.Millisecond), // RequestTimeout
time.Duration(60*time.Second)) // ConnectRecycleTimeout

processor[i].Start()

runtime.SetFinalizer(processor[i], func(p *cli.Processor) {
p.Close()
})

go processRequest(i)
}
processor.Start()
go processRequest()
}

func processRequest(k int) {
func processRequest() {
count := uint64(0)
for {
select {
case op := <-inChan:
for i := 0; i < 3; i++ {
reply, err := processor[k].ProcessRequest(op)
reply, err := processor.ProcessRequest(op)
if err != nil {
if i < 2 {
time.Sleep(10 * time.Millisecond)
Expand Down
12 changes: 4 additions & 8 deletions cmd/dbscanserv/prime/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,10 @@ func RepairKey(key []byte, display bool) bool {
if !found {

clientCfg := client.Config{
Appname: "dbscan",
Namespace: ns,
RetryCount: 1,
ConnectTimeout: util.Duration{500 * time.Millisecond},
ReadTimeout: util.Duration{500 * time.Millisecond},
WriteTimeout: util.Duration{500 * time.Millisecond},
RequestTimeout: util.Duration{1000 * time.Millisecond},
ConnRecycleTimeout: util.Duration{60 * time.Second},
Appname: "dbscan",
Namespace: ns,
ConnectTimeout: util.Duration{500 * time.Millisecond},
ResponseTimeout: util.Duration{1000 * time.Millisecond},
}
clientCfg.Server.Addr = proxyAddr

Expand Down
6 changes: 5 additions & 1 deletion cmd/tools/cmd/cfg/rtcfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/paypal/junodb/pkg/client"
"github.com/paypal/junodb/pkg/cmd"
"github.com/paypal/junodb/pkg/etcd"
"github.com/paypal/junodb/pkg/util"
)

const (
Expand Down Expand Up @@ -78,7 +79,10 @@ func (c *cmdRuntimeConfig) Parse(args []string) (err error) {
return
}
}
c.clientConfig.SetDefault()
c.clientConfig.DefaultTimeToLive = 1800
c.clientConfig.ConnPoolSize = 1
c.clientConfig.ConnectTimeout = util.Duration{1000 * time.Millisecond}
c.clientConfig.ResponseTimeout = util.Duration{1000 * time.Millisecond}

if cfg, e := c.config.GetConfig("Juno"); e == nil {
cfg.WriteTo(&c.clientConfig)
Expand Down
11 changes: 4 additions & 7 deletions cmd/tools/cmd/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,10 @@ const (
)

var defaultConfig = client.Config{
RetryCount: 1,
DefaultTimeToLive: 1800,
ConnectTimeout: util.Duration{100 * time.Millisecond},
ReadTimeout: util.Duration{500 * time.Millisecond},
WriteTimeout: util.Duration{500 * time.Millisecond},
RequestTimeout: util.Duration{1000 * time.Millisecond},
ConnRecycleTimeout: util.Duration{9 * time.Second},
DefaultTimeToLive: 1800,
ConnPoolSize: 1,
ConnectTimeout: util.Duration{100 * time.Millisecond},
ResponseTimeout: util.Duration{1000 * time.Millisecond},
}

type (
Expand Down
9 changes: 5 additions & 4 deletions cmd/tools/cmd/cli/sscli.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ import (
)

const (
connectTimeout = 100 * time.Millisecond
requestTimeout = 1000 * time.Millisecond
connectTimeout = 100 * time.Millisecond
responseTimeout = 1000 * time.Millisecond
)

type (
Expand Down Expand Up @@ -173,9 +173,10 @@ func (s *shardOptionsT) getShardId(key []byte) uint16 {

func newProcessor(cfg *client.Config) *cli.Processor {
processor := cli.NewProcessor(cfg.Server, cfg.Appname,
1, // connPoolSize
cfg.ConnectTimeout.Duration,
cfg.RequestTimeout.Duration,
cfg.ConnRecycleTimeout.Duration)
cfg.ResponseTimeout.Duration,
nil) // GetTLSConfig
processor.Start()
return processor
}
Expand Down
107 changes: 107 additions & 0 deletions internal/cli/conn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
//
// Copyright 2023 PayPal Inc.
//
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

package cli

import (
"crypto/tls"
"errors"
"fmt"
"net"
"time"

"github.com/paypal/junodb/pkg/logging"
"github.com/paypal/junodb/pkg/logging/cal"
)

func TLSInitialized() bool {
return false
}

func Dial(addr string, timeout time.Duration, getTLSConfig func() *tls.Config) (conn net.Conn, err error) {
var tlsConn *tls.Conn

if getTLSConfig == nil {
return nil, errors.New("Unable to get TLS config")
}
timeStart := time.Now()
tlsCfg := getTLSConfig()
if tlsCfg == nil {
err = errors.New("Unable to get TLS config")
} else {
dialer := &net.Dialer{Timeout: timeout}
tlsConn, err = tls.DialWithDialer(dialer, "tcp", addr, tlsCfg)
conn = tlsConn
if tlsConn == nil && err == nil {
err = errors.New("Connect failed.")
}
}

if !cal.IsEnabled() {
return conn, err
}

// Cal logging
status := cal.StatusSuccess
b := logging.NewKVBuffer()
if err != nil {
status = cal.StatusError
b.Add([]byte("err"), err.Error())
} else {
b.Add([]byte("ssl"), getConnectionState(tlsConn))
}

cal.AtomicTransaction(cal.TxnTypeConnect, addr, status,
time.Since(timeStart), b.Bytes())

return conn, err
}

func getConnectionState(c *tls.Conn) string {
if c == nil {
return ""
}

st := c.ConnectionState()
rid := 0
if st.DidResume {
rid = 1
}
msg := fmt.Sprintf("GoTLS:%s:%s:ssl_r=%d", getVersionName(st.Version),
tls.CipherSuiteName(st.CipherSuite), rid)

return msg
}

func getVersionName(ver uint16) string {
switch ver {
case tls.VersionSSL30:
return "SSLv3"
case tls.VersionTLS10:
return "TLSv1"
case tls.VersionTLS11:
return "TLSv1.1"
case tls.VersionTLS12:
return "TLSv1.2"
case tls.VersionTLS13:
return "TLSv1.3"
default:
return ""
}
}
Loading

0 comments on commit 4f99105

Please sign in to comment.