Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate frontend server to vanilla gRPC #111

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 22 additions & 18 deletions common/adapter/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,15 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

// Code generated by generate-adapter. DO NOT EDIT.

package adapter

import (
"fmt"

"github.com/gogo/status"
"go.temporal.io/temporal-proto/errordetails"
"go.uber.org/yarpc/encoding/protobuf"
"go.uber.org/yarpc/yarpcerrors"
"google.golang.org/grpc/codes"

"github.com/temporalio/temporal/.gen/go/shared"
)
Expand All @@ -38,42 +37,47 @@ func ToProtoError(in error) error {
return nil
}

var st *status.Status
switch thriftError := in.(type) {
case *shared.InternalServiceError:
return protobuf.NewError(yarpcerrors.CodeInternal, thriftError.Message)
st = status.New(codes.Internal, thriftError.Message)
case *shared.BadRequestError:
switch thriftError.Message {
case "No permission to do this operation.":
return protobuf.NewError(yarpcerrors.CodePermissionDenied, thriftError.Message)
st = status.New(codes.PermissionDenied, thriftError.Message)
case "Requested workflow history has passed retention period.":
return protobuf.NewError(yarpcerrors.CodeDeadlineExceeded, thriftError.Message)
st = status.New(codes.DeadlineExceeded, thriftError.Message)
default:
return protobuf.NewError(yarpcerrors.CodeInvalidArgument, thriftError.Message)
st = status.New(codes.InvalidArgument, thriftError.Message)
}

case *shared.DomainNotActiveError:
return errordetails.NewDomainNotActiveErrorYARPC(thriftError.Message, thriftError.DomainName, thriftError.CurrentCluster, thriftError.ActiveCluster)
st = errordetails.NewDomainNotActiveStatus(thriftError.Message, thriftError.DomainName, thriftError.CurrentCluster, thriftError.ActiveCluster)
case *shared.ServiceBusyError:
return protobuf.NewError(yarpcerrors.CodeResourceExhausted, thriftError.Message)
st = status.New(codes.ResourceExhausted, thriftError.Message)
case *shared.EntityNotExistsError:
return protobuf.NewError(yarpcerrors.CodeNotFound, thriftError.Message)
st = status.New(codes.NotFound, thriftError.Message)
case *shared.WorkflowExecutionAlreadyStartedError:
return errordetails.NewWorkflowExecutionAlreadyStartedErrorYARPC(*thriftError.Message, *thriftError.StartRequestId, *thriftError.RunId)
st = errordetails.NewWorkflowExecutionAlreadyStartedStatus(*thriftError.Message, *thriftError.StartRequestId, *thriftError.RunId)
case *shared.DomainAlreadyExistsError:
return protobuf.NewError(yarpcerrors.CodeAlreadyExists, thriftError.Message)
st = status.New(codes.AlreadyExists, thriftError.Message)
case *shared.CancellationAlreadyRequestedError:
return protobuf.NewError(yarpcerrors.CodeAlreadyExists, thriftError.Message)
st = status.New(codes.AlreadyExists, thriftError.Message)
case *shared.QueryFailedError:
return protobuf.NewError(yarpcerrors.CodeInvalidArgument, thriftError.Message)
st = status.New(codes.InvalidArgument, thriftError.Message)
case *shared.LimitExceededError:
return protobuf.NewError(yarpcerrors.CodeResourceExhausted, thriftError.Message)
st = status.New(codes.ResourceExhausted, thriftError.Message)
case *shared.ClientVersionNotSupportedError:
return errordetails.NewClientVersionNotSupportedErrorYARPC("Client version is not supported.", thriftError.FeatureVersion, thriftError.ClientImpl, thriftError.SupportedVersions)
st = errordetails.NewClientVersionNotSupportedStatus("Client version is not supported.", thriftError.FeatureVersion, thriftError.ClientImpl, thriftError.SupportedVersions)
case *yarpcerrors.Status:
if thriftError.Code() == yarpcerrors.CodeDeadlineExceeded {
return protobuf.NewError(yarpcerrors.CodeDeadlineExceeded, thriftError.Message())
st = status.New(codes.DeadlineExceeded, thriftError.Message())
}
}

return protobuf.NewError(yarpcerrors.CodeInternal, fmt.Sprintf("temporal internal uncategorized error, msg: %s", in.Error()))
if st == nil {
st = status.New(codes.Internal, fmt.Sprintf("temporal internal uncategorized error, msg: %s", in.Error()))
}

return st.Err()
}
35 changes: 32 additions & 3 deletions common/client/versionChecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/hashicorp/go-version"
"go.uber.org/yarpc"
"google.golang.org/grpc/metadata"

"github.com/temporalio/temporal/.gen/go/shared"
"github.com/temporalio/temporal/common"
Expand Down Expand Up @@ -110,9 +111,9 @@ func (vc *versionChecker) ClientSupported(ctx context.Context, enableClientVersi
return nil
}

call := yarpc.CallFromContext(ctx)
clientFeatureVersion := call.Header(common.FeatureVersionHeaderName)
clientImpl := call.Header(common.ClientImplHeaderName)
headers := GetHeadersValue(ctx, common.FeatureVersionHeaderName, common.ClientImplHeaderName)
clientFeatureVersion := headers[0]
clientImpl := headers[1]

if clientFeatureVersion == "" {
return nil
Expand All @@ -131,6 +132,34 @@ func (vc *versionChecker) ClientSupported(ctx context.Context, enableClientVersi
return nil
}

func GetHeadersValue(ctx context.Context, headerNames ...string) []string {
md, grpcHeader := metadata.FromIncomingContext(ctx)
var call *yarpc.Call
if !grpcHeader {
// backward compatibility with YARPC
call = yarpc.CallFromContext(ctx)
if call == nil {
return make([]string, len(headerNames))
}
}

var headerValues []string
for _, headerName := range headerNames {
if call != nil {
headerValues = append(headerValues, call.Header(headerName))
} else {
values := md.Get(headerName)
if len(values) > 0 {
headerValues = append(headerValues, values[0])
} else {
headerValues = append(headerValues, "")
}
}
}

return headerValues
}

// SupportsStickyQuery returns error if sticky query is not supported otherwise nil.
// In case client version lookup fails assume the client does not support feature.
func (vc *versionChecker) SupportsStickyQuery(clientImpl string, clientFeatureVersion string) error {
Expand Down
3 changes: 3 additions & 0 deletions common/resource/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
package resource

import (
"net"

"go.temporal.io/temporal-proto/workflowservice"
"go.uber.org/yarpc"

Expand Down Expand Up @@ -105,5 +107,6 @@ type (
// for registering handlers
GetDispatcher() *yarpc.Dispatcher
GetGRPCDispatcher() *yarpc.Dispatcher
GetGRPCListener() net.Listener
}
)
21 changes: 18 additions & 3 deletions common/resource/resourceImpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package resource

import (
"math/rand"
"net"
"os"
"sync/atomic"
"time"
Expand Down Expand Up @@ -117,6 +118,7 @@ type (
// for registering handlers
dispatcher *yarpc.Dispatcher
grpcDispatcher *yarpc.Dispatcher
grpcListener net.Listener

// for ringpop listener
ringpopDispatcher *yarpc.Dispatcher
Expand Down Expand Up @@ -150,7 +152,15 @@ func New(
}

dispatcher := params.RPCFactory.GetTChannelDispatcher()
grpcDispatcher := params.RPCFactory.GetGRPCDispatcher()

var grpcListener net.Listener
var grpcDispatcher *yarpc.Dispatcher
if serviceName == common.FrontendServiceName {
grpcListener = params.RPCFactory.GetGRPCListener()
} else {
grpcDispatcher = params.RPCFactory.GetGRPCDispatcher()
}

ringpopDispatcher := params.RPCFactory.GetRingpopDispatcher()

membershipMonitor, err := params.MembershipFactory.GetMembershipMonitor()
Expand Down Expand Up @@ -319,6 +329,7 @@ func New(

// for registering grpc handlers
grpcDispatcher: grpcDispatcher,
grpcListener: grpcListener,

// for ringpop listener
ringpopDispatcher: ringpopDispatcher,
Expand Down Expand Up @@ -375,8 +386,7 @@ func (h *Impl) Start() {
h.hostInfo = hostInfo

// The service is now started up
h.logger.Info("service started", tag.Address(hostInfo.GetAddress()))

h.logger.Info("Service resources started", tag.Address(hostInfo.GetAddress()))
// seed the random generator once for this service
rand.Seed(time.Now().UTC().UnixNano())
}
Expand Down Expand Up @@ -623,3 +633,8 @@ func (h *Impl) GetDispatcher() *yarpc.Dispatcher {
func (h *Impl) GetGRPCDispatcher() *yarpc.Dispatcher {
return h.grpcDispatcher
}

// GetGRPCListener return GRPC listener, used for registering handlers
func (h *Impl) GetGRPCListener() net.Listener {
return h.grpcListener
}
7 changes: 7 additions & 0 deletions common/resource/resourceTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
package resource

import (
"net"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/mock"
"github.com/uber-go/tally"
Expand Down Expand Up @@ -435,6 +437,11 @@ func (s *Test) GetGRPCDispatcher() *yarpc.Dispatcher {
panic("user should implement this method for test")
}

// GetGRPCListener for testing
func (s *Test) GetGRPCListener() net.Listener {
panic("user should implement this method for test")
}

// Finish checks whether expectations are met
func (s *Test) Finish(
t mock.TestingT,
Expand Down
3 changes: 3 additions & 0 deletions common/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
package common

import (
"net"

"go.uber.org/yarpc"
"golang.org/x/net/context"
"google.golang.org/grpc"
Expand Down Expand Up @@ -55,6 +57,7 @@ type (
RPCFactory interface {
GetTChannelDispatcher() *yarpc.Dispatcher
GetGRPCDispatcher() *yarpc.Dispatcher
GetGRPCListener() net.Listener
GetRingpopDispatcher() *yarpc.Dispatcher
CreateTChannelDispatcherForOutbound(callerName, serviceName, hostName string) *yarpc.Dispatcher
CreateGRPCDispatcherForOutbound(callerName, serviceName, hostName string) *yarpc.Dispatcher
Expand Down
24 changes: 24 additions & 0 deletions common/service/config/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type RPCFactory struct {
sync.Mutex
dispatcher *yarpc.Dispatcher
grpcDispatcher *yarpc.Dispatcher
grpcListiner net.Listener
ringpopDispatcher *yarpc.Dispatcher
}

Expand Down Expand Up @@ -86,6 +87,29 @@ func (d *RPCFactory) GetGRPCDispatcher() *yarpc.Dispatcher {
return d.grpcDispatcher
}

// GetGRPCListener returns cached dispatcher for gRPC inbound or creates one
func (d *RPCFactory) GetGRPCListener() net.Listener {
if d.grpcListiner != nil {
return d.grpcListiner
}

d.Lock()
defer d.Unlock()

if d.grpcListiner == nil {
hostAddress := fmt.Sprintf("%v:%v", d.getListenIP(), d.config.GRPCPort)
var err error
d.grpcListiner, err = net.Listen("tcp", hostAddress)
if err != nil {
d.logger.Fatal("Failed create gRPC listener", tag.Error(err), tag.Service(d.serviceName), tag.Address(hostAddress))
}

d.logger.Info("Created gRPC listener", tag.Service(d.serviceName), tag.Address(hostAddress))
}

return d.grpcListiner
}

// GetTChannelDispatcher return a cached dispatcher
func (d *RPCFactory) GetTChannelDispatcher() *yarpc.Dispatcher {
d.Lock()
Expand Down
21 changes: 17 additions & 4 deletions common/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package service

import (
"math/rand"
"net"
"os"
"sync/atomic"
"time"
Expand Down Expand Up @@ -91,6 +92,7 @@ type (
hostInfo *membership.HostInfo
tchannelDispatcher *yarpc.Dispatcher
grpcDispatcher *yarpc.Dispatcher
grpcListener net.Listener
ringpopDispatcher *yarpc.Dispatcher
membershipFactory MembershipMonitorFactory
membershipMonitor membership.Monitor
Expand Down Expand Up @@ -148,9 +150,16 @@ func New(params *BootstrapParams) Service {
sVice.logger.Fatal("Unable to create yarpc TChannel dispatcher")
}

sVice.grpcDispatcher = sVice.rpcFactory.GetGRPCDispatcher()
if sVice.grpcDispatcher == nil {
sVice.logger.Fatal("Unable to create yarpc gRPC dispatcher")
if sVice.sName == common.FrontendServiceName {
sVice.grpcListener = sVice.rpcFactory.GetGRPCListener()
if sVice.grpcListener == nil {
sVice.logger.Fatal("Unable to create gRPC listener")
}
} else {
sVice.grpcDispatcher = sVice.rpcFactory.GetGRPCDispatcher()
if sVice.grpcDispatcher == nil {
sVice.logger.Fatal("Unable to create yarpc gRPC dispatcher")
}
}

sVice.ringpopDispatcher = sVice.rpcFactory.GetRingpopDispatcher()
Expand Down Expand Up @@ -228,7 +237,7 @@ func (h *serviceImpl) Start() {
}

// The service is now started up
h.logger.Info("service started")
h.logger.Info("Service started")
// seed the random generator once for this service
rand.Seed(time.Now().UTC().UnixNano())
}
Expand Down Expand Up @@ -294,6 +303,10 @@ func (h *serviceImpl) GetGRPCDispatcher() *yarpc.Dispatcher {
return h.grpcDispatcher
}

func (h *serviceImpl) GetGRPCListener() net.Listener {
return h.grpcListener
}

// GetClusterMetadata returns the service cluster metadata
func (h *serviceImpl) GetClusterMetadata() cluster.Metadata {
return h.clusterMetadata
Expand Down
7 changes: 7 additions & 0 deletions common/service/serviceTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
package service

import (
"net"

"github.com/temporalio/temporal/client"
"github.com/temporalio/temporal/common/archiver"
"github.com/temporalio/temporal/common/archiver/provider"
Expand Down Expand Up @@ -142,6 +144,11 @@ func (s *serviceTestBase) GetGRPCDispatcher() *yarpc.Dispatcher {
return nil
}

// GetDispatcher returns the gRPC dispatcher used by service
func (s *serviceTestBase) GetGRPCListener() net.Listener {
return nil
}

// GetMembershipMonitor returns the membership monitor used by service
func (s *serviceTestBase) GetMembershipMonitor() membership.Monitor {
return s.membershipMonitor
Expand Down
4 changes: 4 additions & 0 deletions common/service/serviceinterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
package service

import (
"net"

"github.com/temporalio/temporal/client"
"github.com/temporalio/temporal/common/archiver"
"github.com/temporalio/temporal/common/archiver/provider"
Expand Down Expand Up @@ -61,6 +63,8 @@ type (

GetGRPCDispatcher() *yarpc.Dispatcher

GetGRPCListener() net.Listener

GetMembershipMonitor() membership.Monitor

GetHostInfo() *membership.HostInfo
Expand Down
Loading