Skip to content

Commit

Permalink
Migrate frontend server to vanilla gRPC (#111)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin authored Feb 4, 2020
1 parent 0c36bf9 commit bf3dcf8
Show file tree
Hide file tree
Showing 24 changed files with 855 additions and 134 deletions.
40 changes: 22 additions & 18 deletions common/adapter/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,15 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

// Code generated by generate-adapter. DO NOT EDIT.

package adapter

import (
"fmt"

"github.com/gogo/status"
"go.temporal.io/temporal-proto/errordetails"
"go.uber.org/yarpc/encoding/protobuf"
"go.uber.org/yarpc/yarpcerrors"
"google.golang.org/grpc/codes"

"github.com/temporalio/temporal/.gen/go/shared"
)
Expand All @@ -38,42 +37,47 @@ func ToProtoError(in error) error {
return nil
}

var st *status.Status
switch thriftError := in.(type) {
case *shared.InternalServiceError:
return protobuf.NewError(yarpcerrors.CodeInternal, thriftError.Message)
st = status.New(codes.Internal, thriftError.Message)
case *shared.BadRequestError:
switch thriftError.Message {
case "No permission to do this operation.":
return protobuf.NewError(yarpcerrors.CodePermissionDenied, thriftError.Message)
st = status.New(codes.PermissionDenied, thriftError.Message)
case "Requested workflow history has passed retention period.":
return protobuf.NewError(yarpcerrors.CodeDeadlineExceeded, thriftError.Message)
st = status.New(codes.DeadlineExceeded, thriftError.Message)
default:
return protobuf.NewError(yarpcerrors.CodeInvalidArgument, thriftError.Message)
st = status.New(codes.InvalidArgument, thriftError.Message)
}

case *shared.DomainNotActiveError:
return errordetails.NewDomainNotActiveErrorYARPC(thriftError.Message, thriftError.DomainName, thriftError.CurrentCluster, thriftError.ActiveCluster)
st = errordetails.NewDomainNotActiveStatus(thriftError.Message, thriftError.DomainName, thriftError.CurrentCluster, thriftError.ActiveCluster)
case *shared.ServiceBusyError:
return protobuf.NewError(yarpcerrors.CodeResourceExhausted, thriftError.Message)
st = status.New(codes.ResourceExhausted, thriftError.Message)
case *shared.EntityNotExistsError:
return protobuf.NewError(yarpcerrors.CodeNotFound, thriftError.Message)
st = status.New(codes.NotFound, thriftError.Message)
case *shared.WorkflowExecutionAlreadyStartedError:
return errordetails.NewWorkflowExecutionAlreadyStartedErrorYARPC(*thriftError.Message, *thriftError.StartRequestId, *thriftError.RunId)
st = errordetails.NewWorkflowExecutionAlreadyStartedStatus(*thriftError.Message, *thriftError.StartRequestId, *thriftError.RunId)
case *shared.DomainAlreadyExistsError:
return protobuf.NewError(yarpcerrors.CodeAlreadyExists, thriftError.Message)
st = status.New(codes.AlreadyExists, thriftError.Message)
case *shared.CancellationAlreadyRequestedError:
return protobuf.NewError(yarpcerrors.CodeAlreadyExists, thriftError.Message)
st = status.New(codes.AlreadyExists, thriftError.Message)
case *shared.QueryFailedError:
return protobuf.NewError(yarpcerrors.CodeInvalidArgument, thriftError.Message)
st = status.New(codes.InvalidArgument, thriftError.Message)
case *shared.LimitExceededError:
return protobuf.NewError(yarpcerrors.CodeResourceExhausted, thriftError.Message)
st = status.New(codes.ResourceExhausted, thriftError.Message)
case *shared.ClientVersionNotSupportedError:
return errordetails.NewClientVersionNotSupportedErrorYARPC("Client version is not supported.", thriftError.FeatureVersion, thriftError.ClientImpl, thriftError.SupportedVersions)
st = errordetails.NewClientVersionNotSupportedStatus("Client version is not supported.", thriftError.FeatureVersion, thriftError.ClientImpl, thriftError.SupportedVersions)
case *yarpcerrors.Status:
if thriftError.Code() == yarpcerrors.CodeDeadlineExceeded {
return protobuf.NewError(yarpcerrors.CodeDeadlineExceeded, thriftError.Message())
st = status.New(codes.DeadlineExceeded, thriftError.Message())
}
}

return protobuf.NewError(yarpcerrors.CodeInternal, fmt.Sprintf("temporal internal uncategorized error, msg: %s", in.Error()))
if st == nil {
st = status.New(codes.Internal, fmt.Sprintf("temporal internal uncategorized error, msg: %s", in.Error()))
}

return st.Err()
}
35 changes: 32 additions & 3 deletions common/client/versionChecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/hashicorp/go-version"
"go.uber.org/yarpc"
"google.golang.org/grpc/metadata"

"github.com/temporalio/temporal/.gen/go/shared"
"github.com/temporalio/temporal/common"
Expand Down Expand Up @@ -110,9 +111,9 @@ func (vc *versionChecker) ClientSupported(ctx context.Context, enableClientVersi
return nil
}

call := yarpc.CallFromContext(ctx)
clientFeatureVersion := call.Header(common.FeatureVersionHeaderName)
clientImpl := call.Header(common.ClientImplHeaderName)
headers := GetHeadersValue(ctx, common.FeatureVersionHeaderName, common.ClientImplHeaderName)
clientFeatureVersion := headers[0]
clientImpl := headers[1]

if clientFeatureVersion == "" {
return nil
Expand All @@ -131,6 +132,34 @@ func (vc *versionChecker) ClientSupported(ctx context.Context, enableClientVersi
return nil
}

func GetHeadersValue(ctx context.Context, headerNames ...string) []string {
md, grpcHeader := metadata.FromIncomingContext(ctx)
var call *yarpc.Call
if !grpcHeader {
// backward compatibility with YARPC
call = yarpc.CallFromContext(ctx)
if call == nil {
return make([]string, len(headerNames))
}
}

var headerValues []string
for _, headerName := range headerNames {
if call != nil {
headerValues = append(headerValues, call.Header(headerName))
} else {
values := md.Get(headerName)
if len(values) > 0 {
headerValues = append(headerValues, values[0])
} else {
headerValues = append(headerValues, "")
}
}
}

return headerValues
}

// SupportsStickyQuery returns error if sticky query is not supported otherwise nil.
// In case client version lookup fails assume the client does not support feature.
func (vc *versionChecker) SupportsStickyQuery(clientImpl string, clientFeatureVersion string) error {
Expand Down
3 changes: 3 additions & 0 deletions common/resource/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
package resource

import (
"net"

"go.temporal.io/temporal-proto/workflowservice"
"go.uber.org/yarpc"

Expand Down Expand Up @@ -105,5 +107,6 @@ type (
// for registering handlers
GetDispatcher() *yarpc.Dispatcher
GetGRPCDispatcher() *yarpc.Dispatcher
GetGRPCListener() net.Listener
}
)
21 changes: 18 additions & 3 deletions common/resource/resourceImpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package resource

import (
"math/rand"
"net"
"os"
"sync/atomic"
"time"
Expand Down Expand Up @@ -117,6 +118,7 @@ type (
// for registering handlers
dispatcher *yarpc.Dispatcher
grpcDispatcher *yarpc.Dispatcher
grpcListener net.Listener

// for ringpop listener
ringpopDispatcher *yarpc.Dispatcher
Expand Down Expand Up @@ -150,7 +152,15 @@ func New(
}

dispatcher := params.RPCFactory.GetTChannelDispatcher()
grpcDispatcher := params.RPCFactory.GetGRPCDispatcher()

var grpcListener net.Listener
var grpcDispatcher *yarpc.Dispatcher
if serviceName == common.FrontendServiceName {
grpcListener = params.RPCFactory.GetGRPCListener()
} else {
grpcDispatcher = params.RPCFactory.GetGRPCDispatcher()
}

ringpopDispatcher := params.RPCFactory.GetRingpopDispatcher()

membershipMonitor, err := params.MembershipFactory.GetMembershipMonitor()
Expand Down Expand Up @@ -319,6 +329,7 @@ func New(

// for registering grpc handlers
grpcDispatcher: grpcDispatcher,
grpcListener: grpcListener,

// for ringpop listener
ringpopDispatcher: ringpopDispatcher,
Expand Down Expand Up @@ -375,8 +386,7 @@ func (h *Impl) Start() {
h.hostInfo = hostInfo

// The service is now started up
h.logger.Info("service started", tag.Address(hostInfo.GetAddress()))

h.logger.Info("Service resources started", tag.Address(hostInfo.GetAddress()))
// seed the random generator once for this service
rand.Seed(time.Now().UTC().UnixNano())
}
Expand Down Expand Up @@ -623,3 +633,8 @@ func (h *Impl) GetDispatcher() *yarpc.Dispatcher {
func (h *Impl) GetGRPCDispatcher() *yarpc.Dispatcher {
return h.grpcDispatcher
}

// GetGRPCListener return GRPC listener, used for registering handlers
func (h *Impl) GetGRPCListener() net.Listener {
return h.grpcListener
}
7 changes: 7 additions & 0 deletions common/resource/resourceTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
package resource

import (
"net"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/mock"
"github.com/uber-go/tally"
Expand Down Expand Up @@ -435,6 +437,11 @@ func (s *Test) GetGRPCDispatcher() *yarpc.Dispatcher {
panic("user should implement this method for test")
}

// GetGRPCListener for testing
func (s *Test) GetGRPCListener() net.Listener {
panic("user should implement this method for test")
}

// Finish checks whether expectations are met
func (s *Test) Finish(
t mock.TestingT,
Expand Down
3 changes: 3 additions & 0 deletions common/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
package common

import (
"net"

"go.uber.org/yarpc"
"golang.org/x/net/context"
"google.golang.org/grpc"
Expand Down Expand Up @@ -55,6 +57,7 @@ type (
RPCFactory interface {
GetTChannelDispatcher() *yarpc.Dispatcher
GetGRPCDispatcher() *yarpc.Dispatcher
GetGRPCListener() net.Listener
GetRingpopDispatcher() *yarpc.Dispatcher
CreateTChannelDispatcherForOutbound(callerName, serviceName, hostName string) *yarpc.Dispatcher
CreateGRPCDispatcherForOutbound(callerName, serviceName, hostName string) *yarpc.Dispatcher
Expand Down
24 changes: 24 additions & 0 deletions common/service/config/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type RPCFactory struct {
sync.Mutex
dispatcher *yarpc.Dispatcher
grpcDispatcher *yarpc.Dispatcher
grpcListiner net.Listener
ringpopDispatcher *yarpc.Dispatcher
}

Expand Down Expand Up @@ -86,6 +87,29 @@ func (d *RPCFactory) GetGRPCDispatcher() *yarpc.Dispatcher {
return d.grpcDispatcher
}

// GetGRPCListener returns cached dispatcher for gRPC inbound or creates one
func (d *RPCFactory) GetGRPCListener() net.Listener {
if d.grpcListiner != nil {
return d.grpcListiner
}

d.Lock()
defer d.Unlock()

if d.grpcListiner == nil {
hostAddress := fmt.Sprintf("%v:%v", d.getListenIP(), d.config.GRPCPort)
var err error
d.grpcListiner, err = net.Listen("tcp", hostAddress)
if err != nil {
d.logger.Fatal("Failed create gRPC listener", tag.Error(err), tag.Service(d.serviceName), tag.Address(hostAddress))
}

d.logger.Info("Created gRPC listener", tag.Service(d.serviceName), tag.Address(hostAddress))
}

return d.grpcListiner
}

// GetTChannelDispatcher return a cached dispatcher
func (d *RPCFactory) GetTChannelDispatcher() *yarpc.Dispatcher {
d.Lock()
Expand Down
21 changes: 17 additions & 4 deletions common/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package service

import (
"math/rand"
"net"
"os"
"sync/atomic"
"time"
Expand Down Expand Up @@ -91,6 +92,7 @@ type (
hostInfo *membership.HostInfo
tchannelDispatcher *yarpc.Dispatcher
grpcDispatcher *yarpc.Dispatcher
grpcListener net.Listener
ringpopDispatcher *yarpc.Dispatcher
membershipFactory MembershipMonitorFactory
membershipMonitor membership.Monitor
Expand Down Expand Up @@ -148,9 +150,16 @@ func New(params *BootstrapParams) Service {
sVice.logger.Fatal("Unable to create yarpc TChannel dispatcher")
}

sVice.grpcDispatcher = sVice.rpcFactory.GetGRPCDispatcher()
if sVice.grpcDispatcher == nil {
sVice.logger.Fatal("Unable to create yarpc gRPC dispatcher")
if sVice.sName == common.FrontendServiceName {
sVice.grpcListener = sVice.rpcFactory.GetGRPCListener()
if sVice.grpcListener == nil {
sVice.logger.Fatal("Unable to create gRPC listener")
}
} else {
sVice.grpcDispatcher = sVice.rpcFactory.GetGRPCDispatcher()
if sVice.grpcDispatcher == nil {
sVice.logger.Fatal("Unable to create yarpc gRPC dispatcher")
}
}

sVice.ringpopDispatcher = sVice.rpcFactory.GetRingpopDispatcher()
Expand Down Expand Up @@ -228,7 +237,7 @@ func (h *serviceImpl) Start() {
}

// The service is now started up
h.logger.Info("service started")
h.logger.Info("Service started")
// seed the random generator once for this service
rand.Seed(time.Now().UTC().UnixNano())
}
Expand Down Expand Up @@ -294,6 +303,10 @@ func (h *serviceImpl) GetGRPCDispatcher() *yarpc.Dispatcher {
return h.grpcDispatcher
}

func (h *serviceImpl) GetGRPCListener() net.Listener {
return h.grpcListener
}

// GetClusterMetadata returns the service cluster metadata
func (h *serviceImpl) GetClusterMetadata() cluster.Metadata {
return h.clusterMetadata
Expand Down
7 changes: 7 additions & 0 deletions common/service/serviceTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
package service

import (
"net"

"github.com/temporalio/temporal/client"
"github.com/temporalio/temporal/common/archiver"
"github.com/temporalio/temporal/common/archiver/provider"
Expand Down Expand Up @@ -142,6 +144,11 @@ func (s *serviceTestBase) GetGRPCDispatcher() *yarpc.Dispatcher {
return nil
}

// GetDispatcher returns the gRPC dispatcher used by service
func (s *serviceTestBase) GetGRPCListener() net.Listener {
return nil
}

// GetMembershipMonitor returns the membership monitor used by service
func (s *serviceTestBase) GetMembershipMonitor() membership.Monitor {
return s.membershipMonitor
Expand Down
4 changes: 4 additions & 0 deletions common/service/serviceinterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
package service

import (
"net"

"github.com/temporalio/temporal/client"
"github.com/temporalio/temporal/common/archiver"
"github.com/temporalio/temporal/common/archiver/provider"
Expand Down Expand Up @@ -61,6 +63,8 @@ type (

GetGRPCDispatcher() *yarpc.Dispatcher

GetGRPCListener() net.Listener

GetMembershipMonitor() membership.Monitor

GetHostInfo() *membership.HostInfo
Expand Down
Loading

0 comments on commit bf3dcf8

Please sign in to comment.