Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Imp: destroy invoker smoothly #1045

Merged
merged 3 commits into from
Feb 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Apache Dubbo-go [中文](./README_CN.md) #

[![Build Status](https://travis-ci.org/apache/dubbo-go.svg?branch=master)](https://travis-ci.org/apache/dubbo-go)
[![Build Status](https://github.com/apache/dubbo-go/workflows/CI/badge.svg)](https://travis-ci.org/apache/dubbo-go)
[![codecov](https://codecov.io/gh/apache/dubbo-go/branch/master/graph/badge.svg)](https://codecov.io/gh/apache/dubbo-go)
[![go.dev reference](https://img.shields.io/badge/go.dev-reference-007d9c?logo=go&logoColor=white&style=flat-square)](https://pkg.go.dev/github.com/apache/dubbo-go?tab=doc)
[![Go Report Card](https://goreportcard.com/badge/github.com/apache/dubbo-go)](https://goreportcard.com/report/github.com/apache/dubbo-go)
Expand Down
2 changes: 1 addition & 1 deletion README_CN.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Apache Dubbo-go [English](./README.md) #

[![Build Status](https://travis-ci.org/apache/dubbo-go.svg?branch=master)](https://travis-ci.org/apache/dubbo-go)
[![Build Status](https://github.com/apache/dubbo-go/workflows/CI/badge.svg)](https://travis-ci.org/apache/dubbo-go)
[![codecov](https://codecov.io/gh/apache/dubbo-go/branch/master/graph/badge.svg)](https://codecov.io/gh/apache/dubbo-go)
[![go.dev reference](https://img.shields.io/badge/go.dev-reference-007d9c?logo=go&logoColor=white&style=flat-square)](https://pkg.go.dev/github.com/apache/dubbo-go?tab=doc)
[![Go Report Card](https://goreportcard.com/badge/github.com/apache/dubbo-go)](https://goreportcard.com/report/github.com/apache/dubbo-go)
Expand Down
90 changes: 53 additions & 37 deletions protocol/dubbo/dubbo_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,11 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
)

import (
"github.com/opentracing/opentracing-go"
perrors "github.com/pkg/errors"
)

import (
Expand All @@ -41,28 +39,20 @@ import (
"github.com/apache/dubbo-go/remoting"
)

var (
// ErrNoReply
ErrNoReply = perrors.New("request need @response")
// ErrDestroyedInvoker
ErrDestroyedInvoker = perrors.New("request Destroyed invoker")
)

var (
attachmentKey = []string{constant.INTERFACE_KEY, constant.GROUP_KEY, constant.TOKEN_KEY, constant.TIMEOUT_KEY,
constant.VERSION_KEY}
)

// DubboInvoker is implement of protocol.Invoker. A dubboInvoker refer to one service and ip.
// DubboInvoker is implement of protocol.Invoker. A dubboInvoker refers to one service and ip.
type DubboInvoker struct {
protocol.BaseInvoker
// the exchange layer, it is focus on network communication.
client *remoting.ExchangeClient
quitOnce sync.Once
clientGuard *sync.RWMutex
client *remoting.ExchangeClient
quitOnce sync.Once
// timeout for service(interface) level.
timeout time.Duration
// Used to record the number of requests. -1 represent this DubboInvoker is destroyed
reqNum int64
}

// NewDubboInvoker constructor
Expand All @@ -73,12 +63,28 @@ func NewDubboInvoker(url *common.URL, client *remoting.ExchangeClient) *DubboInv
if t, err := time.ParseDuration(requestTimeoutStr); err == nil {
requestTimeout = t
}
return &DubboInvoker{
di := &DubboInvoker{
BaseInvoker: *protocol.NewBaseInvoker(url),
clientGuard: &sync.RWMutex{},
client: client,
reqNum: 0,
timeout: requestTimeout,
}

return di
}

func (di *DubboInvoker) setClient(client *remoting.ExchangeClient) {
di.clientGuard.Lock()
defer di.clientGuard.Unlock()

di.client = client
}

func (di *DubboInvoker) getClient() *remoting.ExchangeClient {
di.clientGuard.RLock()
defer di.clientGuard.RUnlock()

return di.client
}

// Invoke call remoting.
Expand All @@ -87,15 +93,30 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati
err error
result protocol.RPCResult
)
if atomic.LoadInt64(&di.reqNum) < 0 {
if !di.BaseInvoker.IsAvailable() {
gaoxinge marked this conversation as resolved.
Show resolved Hide resolved
// Generally, the case will not happen, because the invoker has been removed
// from the invoker list before destroy,so no new request will enter the destroyed invoker
logger.Warnf("this dubboInvoker is destroyed")
result.Err = ErrDestroyedInvoker
result.Err = protocol.ErrDestroyedInvoker
return &result
}

di.clientGuard.RLock()
defer di.clientGuard.RUnlock()

if di.client == nil {
result.Err = protocol.ErrClientClosed
logger.Debugf("result.Err: %v", result.Err)
AlexStocks marked this conversation as resolved.
Show resolved Hide resolved
return &result
}

if !di.BaseInvoker.IsAvailable() {
// Generally, the case will not happen, because the invoker has been removed
// from the invoker list before destroy,so no new request will enter the destroyed invoker
logger.Warnf("this dubboInvoker is destroying")
result.Err = protocol.ErrDestroyedInvoker
return &result
}
atomic.AddInt64(&(di.reqNum), 1)
defer atomic.AddInt64(&(di.reqNum), -1)

inv := invocation.(*invocation_impl.RPCInvocation)
// init param
Expand Down Expand Up @@ -125,14 +146,13 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati
timeout := di.getTimeout(inv)
if async {
if callBack, ok := inv.CallBack().(func(response common.CallbackResponse)); ok {
//result.Err = di.client.AsyncCall(NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments()), callBack, response)
result.Err = di.client.AsyncRequest(&invocation, url, timeout, callBack, rest)
} else {
result.Err = di.client.Send(&invocation, url, timeout)
}
} else {
if inv.Reply() == nil {
result.Err = ErrNoReply
result.Err = protocol.ErrNoReply
} else {
result.Err = di.client.Request(&invocation, url, timeout, rest)
}
Expand Down Expand Up @@ -162,27 +182,23 @@ func (di *DubboInvoker) getTimeout(invocation *invocation_impl.RPCInvocation) ti
}

func (di *DubboInvoker) IsAvailable() bool {
return di.client.IsAvailable()
client := di.getClient()
if client != nil {
return client.IsAvailable()
}

return false
}

// Destroy destroy dubbo client invoker.
func (di *DubboInvoker) Destroy() {
di.quitOnce.Do(func() {
for {
if di.reqNum == 0 {
di.reqNum = -1
logger.Infof("dubboInvoker is destroyed,url:{%s}", di.GetUrl().Key())
di.BaseInvoker.Destroy()
if di.client != nil {
di.client.Close()
di.client = nil
}
break
}
logger.Warnf("DubboInvoker is to be destroyed, wait {%v} req end,url:{%s}", di.reqNum, di.GetUrl().Key())
time.Sleep(1 * time.Second)
di.BaseInvoker.Destroy()
client := di.getClient()
if client != nil {
di.setClient(nil)
client.Close()
}

})
}

Expand Down
73 changes: 63 additions & 10 deletions protocol/grpc/grpc_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,40 +24,82 @@ import (
)

import (
hessian2 "github.com/apache/dubbo-go-hessian2"
"github.com/pkg/errors"
"google.golang.org/grpc/connectivity"
)

import (
hessian2 "github.com/apache/dubbo-go-hessian2"

"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/protocol"
)

var errNoReply = errors.New("request need @response")
var (
errNoReply = errors.New("request need @response")
)

// nolint
type GrpcInvoker struct {
protocol.BaseInvoker
quitOnce sync.Once
client *Client
quitOnce sync.Once
clientGuard *sync.RWMutex
client *Client
}

// NewGrpcInvoker returns a Grpc invoker instance
func NewGrpcInvoker(url *common.URL, client *Client) *GrpcInvoker {
return &GrpcInvoker{
BaseInvoker: *protocol.NewBaseInvoker(url),
clientGuard: &sync.RWMutex{},
client: client,
}
}

func (gi *GrpcInvoker) setClient(client *Client) {
gi.clientGuard.Lock()
defer gi.clientGuard.Unlock()

gi.client = client
}

func (gi *GrpcInvoker) getClient() *Client {
gi.clientGuard.RLock()
defer gi.clientGuard.RUnlock()

return gi.client
}

// Invoke is used to call service method by invocation
func (gi *GrpcInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
var (
result protocol.RPCResult
)

if !gi.BaseInvoker.IsAvailable() {
// Generally, the case will not happen, because the invoker has been removed
// from the invoker list before destroy,so no new request will enter the destroyed invoker
logger.Warnf("this grpcInvoker is destroyed")
result.Err = protocol.ErrDestroyedInvoker
return &result
}

gi.clientGuard.RLock()
defer gi.clientGuard.RUnlock()

if gi.client == nil {
result.Err = protocol.ErrClientClosed
return &result
}

if !gi.BaseInvoker.IsAvailable() {
// Generally, the case will not happen, because the invoker has been removed
// from the invoker list before destroy,so no new request will enter the destroyed invoker
logger.Warnf("this grpcInvoker is destroying")
result.Err = protocol.ErrDestroyedInvoker
return &result
}

if invocation.Reply() == nil {
result.Err = errNoReply
}
Expand All @@ -83,21 +125,32 @@ func (gi *GrpcInvoker) Invoke(ctx context.Context, invocation protocol.Invocatio

// IsAvailable get available status
func (gi *GrpcInvoker) IsAvailable() bool {
return gi.BaseInvoker.IsAvailable() && gi.client.GetState() != connectivity.Shutdown
client := gi.getClient()
if client != nil {
return gi.BaseInvoker.IsAvailable() && client.GetState() != connectivity.Shutdown
}

return false
}

// IsDestroyed get destroyed status
func (gi *GrpcInvoker) IsDestroyed() bool {
return gi.BaseInvoker.IsDestroyed() && gi.client.GetState() == connectivity.Shutdown
client := gi.getClient()
if client != nil {
return gi.BaseInvoker.IsDestroyed() && client.GetState() == connectivity.Shutdown
}

return false
}

// Destroy will destroy gRPC's invoker and client, so it is only called once
func (gi *GrpcInvoker) Destroy() {
gi.quitOnce.Do(func() {
gi.BaseInvoker.Destroy()

if gi.client != nil {
_ = gi.client.Close()
client := gi.getClient()
if client != nil {
gi.setClient(nil)
client.Close()
}
})
}
38 changes: 27 additions & 11 deletions protocol/invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,25 @@ import (
"context"
)

import (
perrors "github.com/pkg/errors"
uatomic "go.uber.org/atomic"
)

import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/logger"
)

var (
// ErrClientClosed means client has clossed.
ErrClientClosed = perrors.New("remoting client has closed")
// ErrNoReply
ErrNoReply = perrors.New("request need @response")
// ErrDestroyedInvoker
ErrDestroyedInvoker = perrors.New("request Destroyed invoker")
)

// Invoker the service invocation interface for the consumer
//go:generate mockgen -source invoker.go -destination mock/mock_invoker.go -self_package github.com/apache/dubbo-go/protocol/mock --package mock Invoker
// Extension - Invoker
Expand All @@ -42,17 +56,19 @@ type Invoker interface {
// BaseInvoker provides default invoker implement
type BaseInvoker struct {
url *common.URL
available bool
destroyed bool
available uatomic.Bool
destroyed uatomic.Bool
}

// NewBaseInvoker creates a new BaseInvoker
func NewBaseInvoker(url *common.URL) *BaseInvoker {
return &BaseInvoker{
url: url,
available: true,
destroyed: false,
ivk := &BaseInvoker{
url: url,
}
ivk.available.Store(true)
ivk.destroyed.Store(false)

return ivk
}

// GetUrl gets base invoker URL
Expand All @@ -62,12 +78,12 @@ func (bi *BaseInvoker) GetUrl() *common.URL {

// IsAvailable gets available flag
func (bi *BaseInvoker) IsAvailable() bool {
return bi.available
return bi.available.Load()
}

// IsDestroyed gets destroyed flag
func (bi *BaseInvoker) IsDestroyed() bool {
return bi.destroyed
return bi.destroyed.Load()
}

// Invoke provides default invoker implement
Expand All @@ -77,7 +93,7 @@ func (bi *BaseInvoker) Invoke(context context.Context, invocation Invocation) Re

// Destroy changes available and destroyed flag
func (bi *BaseInvoker) Destroy() {
logger.Infof("Destroy invoker: %s", bi.GetUrl().String())
bi.destroyed = true
bi.available = false
logger.Infof("Destroy invoker: %s", bi.GetUrl())
bi.destroyed.Store(true)
bi.available.Store(false)
}
Loading