diff --git a/common/adapter/error.go b/common/adapter/error.go index 60f7b3f6750..781472e83dd 100644 --- a/common/adapter/error.go +++ b/common/adapter/error.go @@ -18,16 +18,15 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// Code generated by generate-adapter. DO NOT EDIT. - package adapter import ( "fmt" + "github.com/gogo/status" "go.temporal.io/temporal-proto/errordetails" - "go.uber.org/yarpc/encoding/protobuf" "go.uber.org/yarpc/yarpcerrors" + "google.golang.org/grpc/codes" "github.com/temporalio/temporal/.gen/go/shared" ) @@ -38,42 +37,47 @@ func ToProtoError(in error) error { return nil } + var st *status.Status switch thriftError := in.(type) { case *shared.InternalServiceError: - return protobuf.NewError(yarpcerrors.CodeInternal, thriftError.Message) + st = status.New(codes.Internal, thriftError.Message) case *shared.BadRequestError: switch thriftError.Message { case "No permission to do this operation.": - return protobuf.NewError(yarpcerrors.CodePermissionDenied, thriftError.Message) + st = status.New(codes.PermissionDenied, thriftError.Message) case "Requested workflow history has passed retention period.": - return protobuf.NewError(yarpcerrors.CodeDeadlineExceeded, thriftError.Message) + st = status.New(codes.DeadlineExceeded, thriftError.Message) default: - return protobuf.NewError(yarpcerrors.CodeInvalidArgument, thriftError.Message) + st = status.New(codes.InvalidArgument, thriftError.Message) } case *shared.DomainNotActiveError: - return errordetails.NewDomainNotActiveErrorYARPC(thriftError.Message, thriftError.DomainName, thriftError.CurrentCluster, thriftError.ActiveCluster) + st = errordetails.NewDomainNotActiveStatus(thriftError.Message, thriftError.DomainName, thriftError.CurrentCluster, thriftError.ActiveCluster) case *shared.ServiceBusyError: - return protobuf.NewError(yarpcerrors.CodeResourceExhausted, thriftError.Message) + st = status.New(codes.ResourceExhausted, thriftError.Message) case *shared.EntityNotExistsError: - return protobuf.NewError(yarpcerrors.CodeNotFound, thriftError.Message) + st = status.New(codes.NotFound, thriftError.Message) case *shared.WorkflowExecutionAlreadyStartedError: - return errordetails.NewWorkflowExecutionAlreadyStartedErrorYARPC(*thriftError.Message, *thriftError.StartRequestId, *thriftError.RunId) + st = errordetails.NewWorkflowExecutionAlreadyStartedStatus(*thriftError.Message, *thriftError.StartRequestId, *thriftError.RunId) case *shared.DomainAlreadyExistsError: - return protobuf.NewError(yarpcerrors.CodeAlreadyExists, thriftError.Message) + st = status.New(codes.AlreadyExists, thriftError.Message) case *shared.CancellationAlreadyRequestedError: - return protobuf.NewError(yarpcerrors.CodeAlreadyExists, thriftError.Message) + st = status.New(codes.AlreadyExists, thriftError.Message) case *shared.QueryFailedError: - return protobuf.NewError(yarpcerrors.CodeInvalidArgument, thriftError.Message) + st = status.New(codes.InvalidArgument, thriftError.Message) case *shared.LimitExceededError: - return protobuf.NewError(yarpcerrors.CodeResourceExhausted, thriftError.Message) + st = status.New(codes.ResourceExhausted, thriftError.Message) case *shared.ClientVersionNotSupportedError: - return errordetails.NewClientVersionNotSupportedErrorYARPC("Client version is not supported.", thriftError.FeatureVersion, thriftError.ClientImpl, thriftError.SupportedVersions) + st = errordetails.NewClientVersionNotSupportedStatus("Client version is not supported.", thriftError.FeatureVersion, thriftError.ClientImpl, thriftError.SupportedVersions) case *yarpcerrors.Status: if thriftError.Code() == yarpcerrors.CodeDeadlineExceeded { - return protobuf.NewError(yarpcerrors.CodeDeadlineExceeded, thriftError.Message()) + st = status.New(codes.DeadlineExceeded, thriftError.Message()) } } - return protobuf.NewError(yarpcerrors.CodeInternal, fmt.Sprintf("temporal internal uncategorized error, msg: %s", in.Error())) + if st == nil { + st = status.New(codes.Internal, fmt.Sprintf("temporal internal uncategorized error, msg: %s", in.Error())) + } + + return st.Err() } diff --git a/common/client/versionChecker.go b/common/client/versionChecker.go index d552efc5314..9c8daf27550 100644 --- a/common/client/versionChecker.go +++ b/common/client/versionChecker.go @@ -26,6 +26,7 @@ import ( "github.com/hashicorp/go-version" "go.uber.org/yarpc" + "google.golang.org/grpc/metadata" "github.com/temporalio/temporal/.gen/go/shared" "github.com/temporalio/temporal/common" @@ -110,9 +111,9 @@ func (vc *versionChecker) ClientSupported(ctx context.Context, enableClientVersi return nil } - call := yarpc.CallFromContext(ctx) - clientFeatureVersion := call.Header(common.FeatureVersionHeaderName) - clientImpl := call.Header(common.ClientImplHeaderName) + headers := GetHeadersValue(ctx, common.FeatureVersionHeaderName, common.ClientImplHeaderName) + clientFeatureVersion := headers[0] + clientImpl := headers[1] if clientFeatureVersion == "" { return nil @@ -131,6 +132,34 @@ func (vc *versionChecker) ClientSupported(ctx context.Context, enableClientVersi return nil } +func GetHeadersValue(ctx context.Context, headerNames ...string) []string { + md, grpcHeader := metadata.FromIncomingContext(ctx) + var call *yarpc.Call + if !grpcHeader { + // backward compatibility with YARPC + call = yarpc.CallFromContext(ctx) + if call == nil { + return make([]string, len(headerNames)) + } + } + + var headerValues []string + for _, headerName := range headerNames { + if call != nil { + headerValues = append(headerValues, call.Header(headerName)) + } else { + values := md.Get(headerName) + if len(values) > 0 { + headerValues = append(headerValues, values[0]) + } else { + headerValues = append(headerValues, "") + } + } + } + + return headerValues +} + // SupportsStickyQuery returns error if sticky query is not supported otherwise nil. // In case client version lookup fails assume the client does not support feature. func (vc *versionChecker) SupportsStickyQuery(clientImpl string, clientFeatureVersion string) error { diff --git a/common/resource/resource.go b/common/resource/resource.go index 2ea0c1c5e4e..f6cf467ad18 100644 --- a/common/resource/resource.go +++ b/common/resource/resource.go @@ -21,6 +21,8 @@ package resource import ( + "net" + "go.temporal.io/temporal-proto/workflowservice" "go.uber.org/yarpc" @@ -105,5 +107,6 @@ type ( // for registering handlers GetDispatcher() *yarpc.Dispatcher GetGRPCDispatcher() *yarpc.Dispatcher + GetGRPCListener() net.Listener } ) diff --git a/common/resource/resourceImpl.go b/common/resource/resourceImpl.go index a77c38a1dc1..8aae82c3cc7 100644 --- a/common/resource/resourceImpl.go +++ b/common/resource/resourceImpl.go @@ -22,6 +22,7 @@ package resource import ( "math/rand" + "net" "os" "sync/atomic" "time" @@ -117,6 +118,7 @@ type ( // for registering handlers dispatcher *yarpc.Dispatcher grpcDispatcher *yarpc.Dispatcher + grpcListener net.Listener // for ringpop listener ringpopDispatcher *yarpc.Dispatcher @@ -150,7 +152,15 @@ func New( } dispatcher := params.RPCFactory.GetTChannelDispatcher() - grpcDispatcher := params.RPCFactory.GetGRPCDispatcher() + + var grpcListener net.Listener + var grpcDispatcher *yarpc.Dispatcher + if serviceName == common.FrontendServiceName { + grpcListener = params.RPCFactory.GetGRPCListener() + } else { + grpcDispatcher = params.RPCFactory.GetGRPCDispatcher() + } + ringpopDispatcher := params.RPCFactory.GetRingpopDispatcher() membershipMonitor, err := params.MembershipFactory.GetMembershipMonitor() @@ -319,6 +329,7 @@ func New( // for registering grpc handlers grpcDispatcher: grpcDispatcher, + grpcListener: grpcListener, // for ringpop listener ringpopDispatcher: ringpopDispatcher, @@ -375,8 +386,7 @@ func (h *Impl) Start() { h.hostInfo = hostInfo // The service is now started up - h.logger.Info("service started", tag.Address(hostInfo.GetAddress())) - + h.logger.Info("Service resources started", tag.Address(hostInfo.GetAddress())) // seed the random generator once for this service rand.Seed(time.Now().UTC().UnixNano()) } @@ -623,3 +633,8 @@ func (h *Impl) GetDispatcher() *yarpc.Dispatcher { func (h *Impl) GetGRPCDispatcher() *yarpc.Dispatcher { return h.grpcDispatcher } + +// GetGRPCListener return GRPC listener, used for registering handlers +func (h *Impl) GetGRPCListener() net.Listener { + return h.grpcListener +} diff --git a/common/resource/resourceTest.go b/common/resource/resourceTest.go index 1496c8ce463..4a5a6783a4a 100644 --- a/common/resource/resourceTest.go +++ b/common/resource/resourceTest.go @@ -21,6 +21,8 @@ package resource import ( + "net" + "github.com/golang/mock/gomock" "github.com/stretchr/testify/mock" "github.com/uber-go/tally" @@ -435,6 +437,11 @@ func (s *Test) GetGRPCDispatcher() *yarpc.Dispatcher { panic("user should implement this method for test") } +// GetGRPCListener for testing +func (s *Test) GetGRPCListener() net.Listener { + panic("user should implement this method for test") +} + // Finish checks whether expectations are met func (s *Test) Finish( t mock.TestingT, diff --git a/common/rpc.go b/common/rpc.go index 463372f01ca..7773a66440e 100644 --- a/common/rpc.go +++ b/common/rpc.go @@ -21,6 +21,8 @@ package common import ( + "net" + "go.uber.org/yarpc" "golang.org/x/net/context" "google.golang.org/grpc" @@ -55,6 +57,7 @@ type ( RPCFactory interface { GetTChannelDispatcher() *yarpc.Dispatcher GetGRPCDispatcher() *yarpc.Dispatcher + GetGRPCListener() net.Listener GetRingpopDispatcher() *yarpc.Dispatcher CreateTChannelDispatcherForOutbound(callerName, serviceName, hostName string) *yarpc.Dispatcher CreateGRPCDispatcherForOutbound(callerName, serviceName, hostName string) *yarpc.Dispatcher diff --git a/common/service/config/rpc.go b/common/service/config/rpc.go index f1d09b322ec..4ed7de8973b 100644 --- a/common/service/config/rpc.go +++ b/common/service/config/rpc.go @@ -45,6 +45,7 @@ type RPCFactory struct { sync.Mutex dispatcher *yarpc.Dispatcher grpcDispatcher *yarpc.Dispatcher + grpcListiner net.Listener ringpopDispatcher *yarpc.Dispatcher } @@ -86,6 +87,29 @@ func (d *RPCFactory) GetGRPCDispatcher() *yarpc.Dispatcher { return d.grpcDispatcher } +// GetGRPCListener returns cached dispatcher for gRPC inbound or creates one +func (d *RPCFactory) GetGRPCListener() net.Listener { + if d.grpcListiner != nil { + return d.grpcListiner + } + + d.Lock() + defer d.Unlock() + + if d.grpcListiner == nil { + hostAddress := fmt.Sprintf("%v:%v", d.getListenIP(), d.config.GRPCPort) + var err error + d.grpcListiner, err = net.Listen("tcp", hostAddress) + if err != nil { + d.logger.Fatal("Failed create gRPC listener", tag.Error(err), tag.Service(d.serviceName), tag.Address(hostAddress)) + } + + d.logger.Info("Created gRPC listener", tag.Service(d.serviceName), tag.Address(hostAddress)) + } + + return d.grpcListiner +} + // GetTChannelDispatcher return a cached dispatcher func (d *RPCFactory) GetTChannelDispatcher() *yarpc.Dispatcher { d.Lock() diff --git a/common/service/service.go b/common/service/service.go index 5d6a395fa21..f59c8a2f888 100644 --- a/common/service/service.go +++ b/common/service/service.go @@ -22,6 +22,7 @@ package service import ( "math/rand" + "net" "os" "sync/atomic" "time" @@ -91,6 +92,7 @@ type ( hostInfo *membership.HostInfo tchannelDispatcher *yarpc.Dispatcher grpcDispatcher *yarpc.Dispatcher + grpcListener net.Listener ringpopDispatcher *yarpc.Dispatcher membershipFactory MembershipMonitorFactory membershipMonitor membership.Monitor @@ -148,9 +150,16 @@ func New(params *BootstrapParams) Service { sVice.logger.Fatal("Unable to create yarpc TChannel dispatcher") } - sVice.grpcDispatcher = sVice.rpcFactory.GetGRPCDispatcher() - if sVice.grpcDispatcher == nil { - sVice.logger.Fatal("Unable to create yarpc gRPC dispatcher") + if sVice.sName == common.FrontendServiceName { + sVice.grpcListener = sVice.rpcFactory.GetGRPCListener() + if sVice.grpcListener == nil { + sVice.logger.Fatal("Unable to create gRPC listener") + } + } else { + sVice.grpcDispatcher = sVice.rpcFactory.GetGRPCDispatcher() + if sVice.grpcDispatcher == nil { + sVice.logger.Fatal("Unable to create yarpc gRPC dispatcher") + } } sVice.ringpopDispatcher = sVice.rpcFactory.GetRingpopDispatcher() @@ -228,7 +237,7 @@ func (h *serviceImpl) Start() { } // The service is now started up - h.logger.Info("service started") + h.logger.Info("Service started") // seed the random generator once for this service rand.Seed(time.Now().UTC().UnixNano()) } @@ -294,6 +303,10 @@ func (h *serviceImpl) GetGRPCDispatcher() *yarpc.Dispatcher { return h.grpcDispatcher } +func (h *serviceImpl) GetGRPCListener() net.Listener { + return h.grpcListener +} + // GetClusterMetadata returns the service cluster metadata func (h *serviceImpl) GetClusterMetadata() cluster.Metadata { return h.clusterMetadata diff --git a/common/service/serviceTestBase.go b/common/service/serviceTestBase.go index 731c85fad12..554762c606c 100644 --- a/common/service/serviceTestBase.go +++ b/common/service/serviceTestBase.go @@ -21,6 +21,8 @@ package service import ( + "net" + "github.com/temporalio/temporal/client" "github.com/temporalio/temporal/common/archiver" "github.com/temporalio/temporal/common/archiver/provider" @@ -142,6 +144,11 @@ func (s *serviceTestBase) GetGRPCDispatcher() *yarpc.Dispatcher { return nil } +// GetDispatcher returns the gRPC dispatcher used by service +func (s *serviceTestBase) GetGRPCListener() net.Listener { + return nil +} + // GetMembershipMonitor returns the membership monitor used by service func (s *serviceTestBase) GetMembershipMonitor() membership.Monitor { return s.membershipMonitor diff --git a/common/service/serviceinterfaces.go b/common/service/serviceinterfaces.go index 6578d69ede9..a5fc9f05942 100644 --- a/common/service/serviceinterfaces.go +++ b/common/service/serviceinterfaces.go @@ -21,6 +21,8 @@ package service import ( + "net" + "github.com/temporalio/temporal/client" "github.com/temporalio/temporal/common/archiver" "github.com/temporalio/temporal/common/archiver/provider" @@ -61,6 +63,8 @@ type ( GetGRPCDispatcher() *yarpc.Dispatcher + GetGRPCListener() net.Listener + GetMembershipMonitor() membership.Monitor GetHostInfo() *membership.HostInfo diff --git a/host/onebox.go b/host/onebox.go index 7f2498eb5aa..054377d1cdf 100644 --- a/host/onebox.go +++ b/host/onebox.go @@ -901,6 +901,7 @@ type rpcFactoryImpl struct { sync.Mutex dispatcher *yarpc.Dispatcher + listener net.Listener ringpopDispatcher *yarpc.Dispatcher } @@ -934,6 +935,27 @@ func (c *rpcFactoryImpl) GetGRPCDispatcher() *yarpc.Dispatcher { }) } +func (c *rpcFactoryImpl) GetGRPCListener() net.Listener { + if c.listener != nil { + return c.listener + } + + c.Lock() + defer c.Unlock() + + if c.listener == nil { + var err error + c.listener, err = net.Listen("tcp", c.grpcHostPort) + if err != nil { + c.logger.Fatal("Failed create gRPC listener", tag.Error(err), tag.Service(c.serviceName), tag.Address(c.grpcHostPort)) + } + + c.logger.Info("Created gRPC listener", tag.Service(c.serviceName), tag.Address(c.grpcHostPort)) + } + + return c.listener +} + func (c *rpcFactoryImpl) GetTChannelDispatcher() *yarpc.Dispatcher { c.Lock() defer c.Unlock() diff --git a/service/frontend/accessControlledHandler.go b/service/frontend/accessControlledHandler.go index 6e73442eccf..dac54b83044 100644 --- a/service/frontend/accessControlledHandler.go +++ b/service/frontend/accessControlledHandler.go @@ -23,9 +23,9 @@ package frontend import ( "context" + "github.com/gogo/status" "go.temporal.io/temporal-proto/workflowservice" - "go.uber.org/yarpc/encoding/protobuf" - "go.uber.org/yarpc/yarpcerrors" + "google.golang.org/grpc/codes" "github.com/temporalio/temporal/.gen/proto/healthservice" "github.com/temporalio/temporal/common/authorization" @@ -34,7 +34,7 @@ import ( // TODO(vancexu): add metrics -var errUnauthorized = protobuf.NewError(yarpcerrors.CodePermissionDenied, "Request unauthorized.") +var errUnauthorized = status.New(codes.PermissionDenied, "Request unauthorized.").Err() // AccessControlledWorkflowHandler frontend handler wrapper for authentication and authorization type AccessControlledWorkflowHandler struct { @@ -42,12 +42,9 @@ type AccessControlledWorkflowHandler struct { frontendHandler workflowservice.WorkflowServiceYARPCServer authorizer authorization.Authorizer - - startFn func() - stopFn func() } -var _ workflowservice.WorkflowServiceYARPCServer = (*AccessControlledWorkflowHandler)(nil) +var _ workflowservice.WorkflowServiceServer = (*AccessControlledWorkflowHandler)(nil) // NewAccessControlledHandlerImpl creates frontend handler with authentication support func NewAccessControlledHandlerImpl(wfHandler *DCRedirectionHandlerImpl, authorizer authorization.Authorizer) *AccessControlledWorkflowHandler { @@ -59,35 +56,17 @@ func NewAccessControlledHandlerImpl(wfHandler *DCRedirectionHandlerImpl, authori Resource: wfHandler.Resource, frontendHandler: wfHandler, authorizer: authorizer, - startFn: func() { wfHandler.Start() }, - stopFn: func() { wfHandler.Stop() }, } } // TODO(vancexu): refactor frontend handler -// RegisterHandler register this handler, must be called before Start() -func (a *AccessControlledWorkflowHandler) RegisterHandler() { - a.GetGRPCDispatcher().Register(workflowservice.BuildWorkflowServiceYARPCProcedures(a)) - a.GetGRPCDispatcher().Register(healthservice.BuildMetaYARPCProcedures(a)) -} - // Health is for health check func (a *AccessControlledWorkflowHandler) Health(context.Context, *healthservice.HealthRequest) (*healthservice.HealthStatus, error) { hs := &healthservice.HealthStatus{Ok: true, Msg: "Frontend health check endpoint (gRPC) reached."} return hs, nil } -// Start starts the handler -func (a *AccessControlledWorkflowHandler) Start() { - a.startFn() -} - -// Stop stops the handler -func (a *AccessControlledWorkflowHandler) Stop() { - a.stopFn() -} - // CountWorkflowExecutions API call func (a *AccessControlledWorkflowHandler) CountWorkflowExecutions( ctx context.Context, diff --git a/service/frontend/adminHandler.go b/service/frontend/adminHandler.go index 4a40e5c0522..c9a9772f19e 100644 --- a/service/frontend/adminHandler.go +++ b/service/frontend/adminHandler.go @@ -95,17 +95,6 @@ func (adh *AdminHandler) RegisterHandler() { adh.GetDispatcher().Register(adminserviceserver.New(adh)) } -// Start starts the handler -func (adh *AdminHandler) Start() { - // Start domain replication queue cleanup - adh.Resource.GetDomainReplicationQueue().Start() -} - -// Stop stops the handler -func (adh *AdminHandler) Stop() { - adh.Resource.GetDomainReplicationQueue().Stop() -} - // AddSearchAttribute add search attribute to whitelist func (adh *AdminHandler) AddSearchAttribute( ctx context.Context, diff --git a/service/frontend/adminHandlerGRPC.go b/service/frontend/adminHandlerGRPC.go index 485aeef4e00..5c4fab5ee6c 100644 --- a/service/frontend/adminHandlerGRPC.go +++ b/service/frontend/adminHandlerGRPC.go @@ -28,7 +28,7 @@ import ( "github.com/temporalio/temporal/common/log" ) -var _ adminservice.AdminServiceYARPCServer = (*AdminHandlerGRPC)(nil) +var _ adminservice.AdminServiceServer = (*AdminHandlerGRPC)(nil) type ( // AdminHandlerGRPC - gRPC handler interface for workflow workflowservice @@ -48,10 +48,15 @@ func NewAdminHandlerGRPC( return handler } -// RegisterHandler register this handler, must be called before Start() -// if DCRedirectionHandler is also used, use RegisterHandler in DCRedirectionHandler instead -func (adh *AdminHandlerGRPC) RegisterHandler() { - adh.adminHandlerThrift.GetGRPCDispatcher().Register(adminservice.BuildAdminServiceYARPCProcedures(adh)) +// Start starts the handler +func (adh *AdminHandlerGRPC) Start() { + // Start domain replication queue cleanup + adh.adminHandlerThrift.Resource.GetDomainReplicationQueue().Start() +} + +// Stop stops the handler +func (adh *AdminHandlerGRPC) Stop() { + adh.adminHandlerThrift.Resource.GetDomainReplicationQueue().Stop() } // DescribeWorkflowExecution ... diff --git a/service/frontend/adminNilCheckHandler.go b/service/frontend/adminNilCheckHandler.go new file mode 100644 index 00000000000..25269ec3544 --- /dev/null +++ b/service/frontend/adminNilCheckHandler.go @@ -0,0 +1,159 @@ +// Copyright (c) 2019 Temporal Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package frontend + +import ( + "context" + + "github.com/temporalio/temporal/.gen/proto/adminservice" +) + +var _ adminservice.AdminServiceServer = (*AdminNilCheckHandler)(nil) + +type ( + // AdminNilCheckHandler - gRPC handler interface for workflow workflowservice + AdminNilCheckHandler struct { + parentHandler adminservice.AdminServiceServer + } +) + +// Due to bug in gogo/protobuf /~https://github.com/gogo/protobuf/issues/651 response can't be nil when error is also nil. +// This handler makes sure response is always not nil, when error is nil. +// Can be removed from pipeline when bug is resolved. + +// NewAdminNilCheckHandler creates handler that never returns nil response when error is nil +func NewAdminNilCheckHandler( + parentHandler adminservice.AdminServiceServer, +) *AdminNilCheckHandler { + handler := &AdminNilCheckHandler{ + parentHandler: parentHandler, + } + + return handler +} + +// DescribeWorkflowExecution ... +func (adh *AdminNilCheckHandler) DescribeWorkflowExecution(ctx context.Context, request *adminservice.DescribeWorkflowExecutionRequest) (_ *adminservice.DescribeWorkflowExecutionResponse, retError error) { + resp, err := adh.parentHandler.DescribeWorkflowExecution(ctx, request) + if resp == nil && err == nil { + return &adminservice.DescribeWorkflowExecutionResponse{}, err + } + return resp, err +} + +// DescribeHistoryHost ... +func (adh *AdminNilCheckHandler) DescribeHistoryHost(ctx context.Context, request *adminservice.DescribeHistoryHostRequest) (_ *adminservice.DescribeHistoryHostResponse, retError error) { + resp, err := adh.parentHandler.DescribeHistoryHost(ctx, request) + if resp == nil && err == nil { + return &adminservice.DescribeHistoryHostResponse{}, err + } + return resp, err +} + +// CloseShard ... +func (adh *AdminNilCheckHandler) CloseShard(ctx context.Context, request *adminservice.CloseShardRequest) (_ *adminservice.CloseShardResponse, retError error) { + resp, err := adh.parentHandler.CloseShard(ctx, request) + if resp == nil && err == nil { + return &adminservice.CloseShardResponse{}, err + } + return resp, err +} + +// RemoveTask ... +func (adh *AdminNilCheckHandler) RemoveTask(ctx context.Context, request *adminservice.RemoveTaskRequest) (_ *adminservice.RemoveTaskResponse, retError error) { + resp, err := adh.parentHandler.RemoveTask(ctx, request) + if resp == nil && err == nil { + return &adminservice.RemoveTaskResponse{}, err + } + return resp, err +} + +// GetWorkflowExecutionRawHistory ... +func (adh *AdminNilCheckHandler) GetWorkflowExecutionRawHistory(ctx context.Context, request *adminservice.GetWorkflowExecutionRawHistoryRequest) (_ *adminservice.GetWorkflowExecutionRawHistoryResponse, retError error) { + resp, err := adh.parentHandler.GetWorkflowExecutionRawHistory(ctx, request) + if resp == nil && err == nil { + return &adminservice.GetWorkflowExecutionRawHistoryResponse{}, err + } + return resp, err +} + +// GetWorkflowExecutionRawHistoryV2 ... +func (adh *AdminNilCheckHandler) GetWorkflowExecutionRawHistoryV2(ctx context.Context, request *adminservice.GetWorkflowExecutionRawHistoryV2Request) (_ *adminservice.GetWorkflowExecutionRawHistoryV2Response, retError error) { + resp, err := adh.parentHandler.GetWorkflowExecutionRawHistoryV2(ctx, request) + if resp == nil && err == nil { + return &adminservice.GetWorkflowExecutionRawHistoryV2Response{}, err + } + return resp, err +} + +// AddSearchAttribute ... +func (adh *AdminNilCheckHandler) AddSearchAttribute(ctx context.Context, request *adminservice.AddSearchAttributeRequest) (_ *adminservice.AddSearchAttributeResponse, retError error) { + resp, err := adh.parentHandler.AddSearchAttribute(ctx, request) + if resp == nil && err == nil { + return &adminservice.AddSearchAttributeResponse{}, err + } + return resp, err +} + +// DescribeCluster ... +func (adh *AdminNilCheckHandler) DescribeCluster(ctx context.Context, request *adminservice.DescribeClusterRequest) (_ *adminservice.DescribeClusterResponse, retError error) { + resp, err := adh.parentHandler.DescribeCluster(ctx, request) + if resp == nil && err == nil { + return &adminservice.DescribeClusterResponse{}, err + } + return resp, err +} + +// GetReplicationMessages ... +func (adh *AdminNilCheckHandler) GetReplicationMessages(ctx context.Context, request *adminservice.GetReplicationMessagesRequest) (_ *adminservice.GetReplicationMessagesResponse, retError error) { + resp, err := adh.parentHandler.GetReplicationMessages(ctx, request) + if resp == nil && err == nil { + return &adminservice.GetReplicationMessagesResponse{}, err + } + return resp, err +} + +// GetDomainReplicationMessages ... +func (adh *AdminNilCheckHandler) GetDomainReplicationMessages(ctx context.Context, request *adminservice.GetDomainReplicationMessagesRequest) (_ *adminservice.GetDomainReplicationMessagesResponse, retError error) { + resp, err := adh.parentHandler.GetDomainReplicationMessages(ctx, request) + if resp == nil && err == nil { + return &adminservice.GetDomainReplicationMessagesResponse{}, err + } + return resp, err +} + +// GetDLQReplicationMessages ... +func (adh *AdminNilCheckHandler) GetDLQReplicationMessages(ctx context.Context, request *adminservice.GetDLQReplicationMessagesRequest) (_ *adminservice.GetDLQReplicationMessagesResponse, retError error) { + resp, err := adh.parentHandler.GetDLQReplicationMessages(ctx, request) + if resp == nil && err == nil { + return &adminservice.GetDLQReplicationMessagesResponse{}, err + } + return resp, err +} + +// ReapplyEvents ... +func (adh *AdminNilCheckHandler) ReapplyEvents(ctx context.Context, request *adminservice.ReapplyEventsRequest) (_ *adminservice.ReapplyEventsResponse, retError error) { + resp, err := adh.parentHandler.ReapplyEvents(ctx, request) + if resp == nil && err == nil { + return &adminservice.ReapplyEventsResponse{}, err + } + return resp, err +} diff --git a/service/frontend/dcRedirectionHandler.go b/service/frontend/dcRedirectionHandler.go index 77323a1a7d4..3cc468d0f74 100644 --- a/service/frontend/dcRedirectionHandler.go +++ b/service/frontend/dcRedirectionHandler.go @@ -33,7 +33,7 @@ import ( "github.com/temporalio/temporal/common/service/config" ) -var _ workflowservice.WorkflowServiceYARPCServer = (*DCRedirectionHandlerImpl)(nil) +var _ workflowservice.WorkflowServiceServer = (*DCRedirectionHandlerImpl)(nil) type ( // DCRedirectionHandlerImpl is simple wrapper over frontend service, doing redirection based on policy @@ -45,9 +45,6 @@ type ( redirectionPolicy DCRedirectionPolicy tokenSerializer common.TaskTokenSerializer frontendHandler workflowservice.WorkflowServiceYARPCServer - - startFn func() - stopFn func() } ) @@ -70,26 +67,9 @@ func NewDCRedirectionHandler( redirectionPolicy: dcRedirectionPolicy, tokenSerializer: common.NewJSONTaskTokenSerializer(), frontendHandler: wfHandler, - startFn: func() { wfHandler.workflowHandlerThrift.Start() }, - stopFn: func() { wfHandler.workflowHandlerThrift.Stop() }, } } -// RegisterHandler register this handler, must be called before Start() -func (handler *DCRedirectionHandlerImpl) RegisterHandler() { - handler.GetGRPCDispatcher().Register(workflowservice.BuildWorkflowServiceYARPCProcedures(handler)) -} - -// Start starts the handler -func (handler *DCRedirectionHandlerImpl) Start() { - handler.startFn() -} - -// Stop stops the handler -func (handler *DCRedirectionHandlerImpl) Stop() { - handler.stopFn() -} - // Domain APIs, domain APIs does not require redirection // DeprecateDomain API call diff --git a/service/frontend/dcRedirectionHandler_test.go b/service/frontend/dcRedirectionHandler_test.go index 004c53947e1..7d39688a11e 100644 --- a/service/frontend/dcRedirectionHandler_test.go +++ b/service/frontend/dcRedirectionHandler_test.go @@ -47,7 +47,7 @@ type ( controller *gomock.Controller mockResource *resource.Test - mockFrontendHandler *workflowservicemock.MockWorkflowServiceYARPCServer + mockFrontendHandler *workflowservicemock.MockWorkflowServiceServer mockRemoteFrontendClient *workflowservicemock.MockWorkflowServiceClient mockClusterMetadata *cluster.MockMetadata @@ -97,7 +97,7 @@ func (s *dcRedirectionHandlerSuite) SetupTest() { frontendHandlerGRPC := NewWorkflowHandlerGRPC(frontendHandler) - s.mockFrontendHandler = workflowservicemock.NewMockWorkflowServiceYARPCServer(s.controller) + s.mockFrontendHandler = workflowservicemock.NewMockWorkflowServiceServer(s.controller) s.handler = NewDCRedirectionHandler(frontendHandlerGRPC, config.DCRedirectionPolicy{}) s.handler.frontendHandler = s.mockFrontendHandler s.handler.redirectionPolicy = s.mockDCRedirectionPolicy diff --git a/service/frontend/dcRedirectionPolicy.go b/service/frontend/dcRedirectionPolicy.go index b8572036af1..c890c6f8c94 100644 --- a/service/frontend/dcRedirectionPolicy.go +++ b/service/frontend/dcRedirectionPolicy.go @@ -24,11 +24,10 @@ import ( "context" "fmt" - "go.uber.org/yarpc" - "github.com/temporalio/temporal/.gen/go/shared" "github.com/temporalio/temporal/common" "github.com/temporalio/temporal/common/cache" + "github.com/temporalio/temporal/common/client" "github.com/temporalio/temporal/common/cluster" "github.com/temporalio/temporal/common/service/config" ) @@ -169,8 +168,7 @@ func (policy *SelectedAPIsForwardingRedirectionPolicy) getTargetClusterAndIsDoma return policy.currentClusterName, false } - call := yarpc.CallFromContext(ctx) - enforceDCRedirection := call.Header(common.EnforceDCRedirection) + enforceDCRedirection := client.GetHeadersValue(ctx, common.EnforceDCRedirection)[0] if !policy.config.EnableDomainNotActiveAutoForwarding(domainEntry.GetInfo().Name) && enforceDCRedirection != "true" { // do not do dc redirection if auto-forwarding dynamic config and EnforceDCRedirection context flag is not enabled return policy.currentClusterName, false diff --git a/service/frontend/service.go b/service/frontend/service.go index 6422609886d..4e5d3f36ba2 100644 --- a/service/frontend/service.go +++ b/service/frontend/service.go @@ -24,7 +24,11 @@ import ( "sync/atomic" "github.com/stretchr/testify/mock" + "go.temporal.io/temporal-proto/workflowservice" + "google.golang.org/grpc" + "github.com/temporalio/temporal/.gen/proto/adminservice" + "github.com/temporalio/temporal/.gen/proto/healthservice" "github.com/temporalio/temporal/common" "github.com/temporalio/temporal/common/definition" "github.com/temporalio/temporal/common/domain" @@ -130,11 +134,11 @@ type Service struct { resource.Resource status int32 - stopC chan struct{} config *Config params *service.BootstrapParams - adminHandler *AdminHandler + adminHandlerGRPC *AdminHandlerGRPC + server *grpc.Server } // NewService builds a new cadence-frontend service @@ -193,7 +197,6 @@ func NewService( Resource: serviceResource, status: common.DaemonStatusInitialized, config: serviceConfig, - stopC: make(chan struct{}), params: params, }, nil } @@ -226,28 +229,37 @@ func (s *Service) Start() { replicationMessageSink.(*mocks.KafkaProducer).On("Publish", mock.Anything).Return(nil) } + s.server = grpc.NewServer() + wfHandler := NewWorkflowHandler(s, s.config, replicationMessageSink) wfHandlerGRPC := NewWorkflowHandlerGRPC(wfHandler) dcRedirectionHandler := NewDCRedirectionHandler(wfHandlerGRPC, s.params.DCRedirectionPolicy) accessControlledWorkflowHandler := NewAccessControlledHandlerImpl(dcRedirectionHandler, s.params.Authorizer) + workflowNilCheckHandler := NewWorkflowNilCheckHandler(accessControlledWorkflowHandler) + + workflowservice.RegisterWorkflowServiceServer(s.server, workflowNilCheckHandler) + healthservice.RegisterMetaServer(s.server, accessControlledWorkflowHandler) - accessControlledWorkflowHandler.RegisterHandler() - wfHandler.RegisterHandler() + adminHandler := NewAdminHandler(s, s.params, s.config) + s.adminHandlerGRPC = NewAdminHandlerGRPC(adminHandler) + adminNilCheckHandler := NewAdminNilCheckHandler(s.adminHandlerGRPC) - s.adminHandler = NewAdminHandler(s, s.params, s.config) - adminHandlerGRPC := NewAdminHandlerGRPC(s.adminHandler) - adminHandlerGRPC.RegisterHandler() - s.adminHandler.RegisterHandler() + adminservice.RegisterAdminServiceServer(s.server, adminNilCheckHandler) + + wfHandler.RegisterHandler() // Thrift version + adminHandler.RegisterHandler() // Thrift version // must start resource first s.Resource.Start() - s.adminHandler.Start() + s.adminHandlerGRPC.Start() - // base (service is not started in frontend or admin handler) in case of race condition in yarpc registration function - - logger.Info("frontend started") + listener := s.GetGRPCListener() + logger.Info("Starting to serve on frontend listener") + if err := s.server.Serve(listener); err != nil { + logger.Fatal("Failed to serve on frontend listener", tag.Error(err)) + } - <-s.stopC + // base (service is not started in frontend or admin handler) in case of race condition in yarpc registration function } // Stop stops the service @@ -256,9 +268,9 @@ func (s *Service) Stop() { return } - close(s.stopC) + s.server.GracefulStop() - s.adminHandler.Stop() + s.adminHandlerGRPC.Stop() s.Resource.Stop() s.params.Logger.Info("frontend stopped") diff --git a/service/frontend/workflowHandler.go b/service/frontend/workflowHandler.go index 0a3a73dfd05..852f152d937 100644 --- a/service/frontend/workflowHandler.go +++ b/service/frontend/workflowHandler.go @@ -27,11 +27,9 @@ import ( "time" "github.com/pborman/uuid" - "go.uber.org/yarpc" "go.uber.org/yarpc/yarpcerrors" "github.com/temporalio/temporal/.gen/go/health" - "github.com/temporalio/temporal/.gen/go/health/metaserver" h "github.com/temporalio/temporal/.gen/go/history" m "github.com/temporalio/temporal/.gen/go/matching" gen "github.com/temporalio/temporal/.gen/go/shared" @@ -173,15 +171,6 @@ func NewWorkflowHandler( // if DCRedirectionHandler is also used, use RegisterHandler in DCRedirectionHandler instead func (wh *WorkflowHandler) RegisterHandler() { wh.GetDispatcher().Register(workflowserviceserver.New(wh)) - wh.GetDispatcher().Register(metaserver.New(wh)) -} - -// Start starts the handler -func (wh *WorkflowHandler) Start() { -} - -// Stop stops the handler -func (wh *WorkflowHandler) Stop() { } // Health is for health check @@ -427,6 +416,7 @@ func (wh *WorkflowHandler) PollForActivityTask( return nil, wh.error(err, scope) } } + return resp, nil } @@ -1543,11 +1533,10 @@ func (wh *WorkflowHandler) RespondQueryTaskCompleted( } } - call := yarpc.CallFromContext(ctx) - + headers := client.GetHeadersValue(ctx, common.ClientImplHeaderName, common.FeatureVersionHeaderName) completeRequest.WorkerVersionInfo = &gen.WorkerVersionInfo{ - Impl: common.StringPtr(call.Header(common.ClientImplHeaderName)), - FeatureVersion: common.StringPtr(call.Header(common.FeatureVersionHeaderName)), + Impl: common.StringPtr(headers[0]), + FeatureVersion: common.StringPtr(headers[1]), } matchingRequest := &m.RespondQueryTaskCompletedRequest{ DomainUUID: common.StringPtr(queryTaskToken.DomainID), diff --git a/service/frontend/workflowHandlerGRPC.go b/service/frontend/workflowHandlerGRPC.go index 845e78ae79c..77b57e1bef2 100644 --- a/service/frontend/workflowHandlerGRPC.go +++ b/service/frontend/workflowHandlerGRPC.go @@ -29,7 +29,7 @@ import ( "github.com/temporalio/temporal/common/log" ) -var _ workflowservice.WorkflowServiceYARPCServer = (*WorkflowHandlerGRPC)(nil) +var _ workflowservice.WorkflowServiceServer = (*WorkflowHandlerGRPC)(nil) type ( // WorkflowHandlerGRPC - gRPC handler interface for workflow workflowservice diff --git a/service/frontend/workflowNilCheckHandler.go b/service/frontend/workflowNilCheckHandler.go new file mode 100644 index 00000000000..2ac218a4124 --- /dev/null +++ b/service/frontend/workflowNilCheckHandler.go @@ -0,0 +1,480 @@ +// Copyright (c) 2019 Temporal Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package frontend + +import ( + "context" + + "go.temporal.io/temporal-proto/workflowservice" +) + +var _ workflowservice.WorkflowServiceServer = (*WorkflowNilCheckHandler)(nil) + +type ( + // WorkflowNilCheckHandler - gRPC handler interface for workflow workflowservice + WorkflowNilCheckHandler struct { + parentHandler workflowservice.WorkflowServiceServer + } +) + +// Due to bug in gogo/protobuf /~https://github.com/gogo/protobuf/issues/651 response can't be nil when error is also nil. +// This handler makes sure response is always not nil, when error is nil. +// Can be removed from pipeline when bug is resolved. + +// NewWorkflowNilCheckHandler creates handler that never returns nil response when error is nil +func NewWorkflowNilCheckHandler( + parentHandler workflowservice.WorkflowServiceServer, +) *WorkflowNilCheckHandler { + handler := &WorkflowNilCheckHandler{ + parentHandler: parentHandler, + } + + return handler +} + +// RegisterDomain creates a new domain which can be used as a container for all resources. Domain is a top level +// entity within Cadence, used as a container for all resources like workflow executions, tasklists, etc. Domain +// acts as a sandbox and provides isolation for all resources within the domain. All resources belongs to exactly one +// domain. +func (wh *WorkflowNilCheckHandler) RegisterDomain(ctx context.Context, request *workflowservice.RegisterDomainRequest) (_ *workflowservice.RegisterDomainResponse, retError error) { + resp, err := wh.parentHandler.RegisterDomain(ctx, request) + if resp == nil && err == nil { + return &workflowservice.RegisterDomainResponse{}, err + } + return resp, err +} + +// DescribeDomain returns the information and configuration for a registered domain. +func (wh *WorkflowNilCheckHandler) DescribeDomain(ctx context.Context, request *workflowservice.DescribeDomainRequest) (_ *workflowservice.DescribeDomainResponse, retError error) { + resp, err := wh.parentHandler.DescribeDomain(ctx, request) + if resp == nil && err == nil { + return &workflowservice.DescribeDomainResponse{}, err + } + return resp, err +} + +// ListDomains returns the information and configuration for all domains. +func (wh *WorkflowNilCheckHandler) ListDomains(ctx context.Context, request *workflowservice.ListDomainsRequest) (_ *workflowservice.ListDomainsResponse, retError error) { + resp, err := wh.parentHandler.ListDomains(ctx, request) + if resp == nil && err == nil { + return &workflowservice.ListDomainsResponse{}, err + } + return resp, err +} + +// UpdateDomain is used to update the information and configuration for a registered domain. +func (wh *WorkflowNilCheckHandler) UpdateDomain(ctx context.Context, request *workflowservice.UpdateDomainRequest) (_ *workflowservice.UpdateDomainResponse, retError error) { + resp, err := wh.parentHandler.UpdateDomain(ctx, request) + if resp == nil && err == nil { + return &workflowservice.UpdateDomainResponse{}, err + } + return resp, err +} + +// DeprecateDomain us used to update status of a registered domain to DEPRECATED. Once the domain is deprecated +// it cannot be used to start new workflow executions. Existing workflow executions will continue to run on +// deprecated domains. +func (wh *WorkflowNilCheckHandler) DeprecateDomain(ctx context.Context, request *workflowservice.DeprecateDomainRequest) (_ *workflowservice.DeprecateDomainResponse, retError error) { + resp, err := wh.parentHandler.DeprecateDomain(ctx, request) + if resp == nil && err == nil { + return &workflowservice.DeprecateDomainResponse{}, err + } + return resp, err +} + +// StartWorkflowExecution starts a new long running workflow instance. It will create the instance with +// 'WorkflowExecutionStarted' event in history and also schedule the first DecisionTask for the worker to make the +// first decision for this instance. It will return 'WorkflowExecutionAlreadyStartedError', if an instance already +// exists with same workflowId. +func (wh *WorkflowNilCheckHandler) StartWorkflowExecution(ctx context.Context, request *workflowservice.StartWorkflowExecutionRequest) (_ *workflowservice.StartWorkflowExecutionResponse, retError error) { + resp, err := wh.parentHandler.StartWorkflowExecution(ctx, request) + if resp == nil && err == nil { + return &workflowservice.StartWorkflowExecutionResponse{}, err + } + return resp, err +} + +// GetWorkflowExecutionHistory returns the history of specified workflow execution. It fails with 'EntityNotExistError' if speficied workflow +// execution in unknown to the service. +func (wh *WorkflowNilCheckHandler) GetWorkflowExecutionHistory(ctx context.Context, request *workflowservice.GetWorkflowExecutionHistoryRequest) (_ *workflowservice.GetWorkflowExecutionHistoryResponse, retError error) { + resp, err := wh.parentHandler.GetWorkflowExecutionHistory(ctx, request) + if resp == nil && err == nil { + return &workflowservice.GetWorkflowExecutionHistoryResponse{}, err + } + return resp, err +} + +// PollForDecisionTask is called by application worker to process DecisionTask from a specific taskList. A +// DecisionTask is dispatched to callers for active workflow executions, with pending decisions. +// Application is then expected to call 'RespondDecisionTaskCompleted' API when it is done processing the DecisionTask. +// It will also create a 'DecisionTaskStarted' event in the history for that session before handing off DecisionTask to +// application worker. +func (wh *WorkflowNilCheckHandler) PollForDecisionTask(ctx context.Context, request *workflowservice.PollForDecisionTaskRequest) (_ *workflowservice.PollForDecisionTaskResponse, retError error) { + resp, err := wh.parentHandler.PollForDecisionTask(ctx, request) + if resp == nil && err == nil { + return &workflowservice.PollForDecisionTaskResponse{}, err + } + return resp, err +} + +// RespondDecisionTaskCompleted is called by application worker to complete a DecisionTask handed as a result of +// 'PollForDecisionTask' API call. Completing a DecisionTask will result in new events for the workflow execution and +// potentially new ActivityTask being created for corresponding decisions. It will also create a DecisionTaskCompleted +// event in the history for that session. Use the 'taskToken' provided as response of PollForDecisionTask API call +// for completing the DecisionTask. +// The response could contain a new decision task if there is one or if the request asking for one. +func (wh *WorkflowNilCheckHandler) RespondDecisionTaskCompleted(ctx context.Context, request *workflowservice.RespondDecisionTaskCompletedRequest) (_ *workflowservice.RespondDecisionTaskCompletedResponse, retError error) { + resp, err := wh.parentHandler.RespondDecisionTaskCompleted(ctx, request) + if resp == nil && err == nil { + return &workflowservice.RespondDecisionTaskCompletedResponse{}, err + } + return resp, err +} + +// RespondDecisionTaskFailed is called by application worker to indicate failure. This results in +// DecisionTaskFailedEvent written to the history and a new DecisionTask created. This API can be used by client to +// either clear sticky tasklist or report any panics during DecisionTask processing. Cadence will only append first +// DecisionTaskFailed event to the history of workflow execution for consecutive failures. +func (wh *WorkflowNilCheckHandler) RespondDecisionTaskFailed(ctx context.Context, request *workflowservice.RespondDecisionTaskFailedRequest) (_ *workflowservice.RespondDecisionTaskFailedResponse, retError error) { + resp, err := wh.parentHandler.RespondDecisionTaskFailed(ctx, request) + if resp == nil && err == nil { + return &workflowservice.RespondDecisionTaskFailedResponse{}, err + } + return resp, err +} + +// PollForActivityTask is called by application worker to process ActivityTask from a specific taskList. ActivityTask +// is dispatched to callers whenever a ScheduleTask decision is made for a workflow execution. +// Application is expected to call 'RespondActivityTaskCompleted' or 'RespondActivityTaskFailed' once it is done +// processing the task. +// Application also needs to call 'RecordActivityTaskHeartbeat' API within 'heartbeatTimeoutSeconds' interval to +// prevent the task from getting timed out. An event 'ActivityTaskStarted' event is also written to workflow execution +// history before the ActivityTask is dispatched to application worker. +func (wh *WorkflowNilCheckHandler) PollForActivityTask(ctx context.Context, request *workflowservice.PollForActivityTaskRequest) (_ *workflowservice.PollForActivityTaskResponse, retError error) { + resp, err := wh.parentHandler.PollForActivityTask(ctx, request) + if resp == nil && err == nil { + return &workflowservice.PollForActivityTaskResponse{}, err + } + return resp, err +} + +// RecordActivityTaskHeartbeat is called by application worker while it is processing an ActivityTask. If worker fails +// to heartbeat within 'heartbeatTimeoutSeconds' interval for the ActivityTask, then it will be marked as timedout and +// 'ActivityTaskTimedOut' event will be written to the workflow history. Calling 'RecordActivityTaskHeartbeat' will +// fail with 'EntityNotExistsError' in such situations. Use the 'taskToken' provided as response of +// PollForActivityTask API call for heartbeating. +func (wh *WorkflowNilCheckHandler) RecordActivityTaskHeartbeat(ctx context.Context, request *workflowservice.RecordActivityTaskHeartbeatRequest) (_ *workflowservice.RecordActivityTaskHeartbeatResponse, retError error) { + resp, err := wh.parentHandler.RecordActivityTaskHeartbeat(ctx, request) + if resp == nil && err == nil { + return &workflowservice.RecordActivityTaskHeartbeatResponse{}, err + } + return resp, err +} + +// RecordActivityTaskHeartbeatByID is called by application worker while it is processing an ActivityTask. If worker fails +// to heartbeat within 'heartbeatTimeoutSeconds' interval for the ActivityTask, then it will be marked as timedout and +// 'ActivityTaskTimedOut' event will be written to the workflow history. Calling 'RecordActivityTaskHeartbeatByID' will +// fail with 'EntityNotExistsError' in such situations. Instead of using 'taskToken' like in RecordActivityTaskHeartbeat, +// use Domain, WorkflowID and ActivityID +func (wh *WorkflowNilCheckHandler) RecordActivityTaskHeartbeatByID(ctx context.Context, request *workflowservice.RecordActivityTaskHeartbeatByIDRequest) (_ *workflowservice.RecordActivityTaskHeartbeatByIDResponse, retError error) { + resp, err := wh.parentHandler.RecordActivityTaskHeartbeatByID(ctx, request) + if resp == nil && err == nil { + return &workflowservice.RecordActivityTaskHeartbeatByIDResponse{}, err + } + return resp, err +} + +// RespondActivityTaskCompleted is called by application worker when it is done processing an ActivityTask. It will +// result in a new 'ActivityTaskCompleted' event being written to the workflow history and a new DecisionTask +// created for the workflow so new decisions could be made. Use the 'taskToken' provided as response of +// PollForActivityTask API call for completion. It fails with 'EntityNotExistsError' if the taskToken is not valid +// anymore due to activity timeout. +func (wh *WorkflowNilCheckHandler) RespondActivityTaskCompleted(ctx context.Context, request *workflowservice.RespondActivityTaskCompletedRequest) (_ *workflowservice.RespondActivityTaskCompletedResponse, retError error) { + resp, err := wh.parentHandler.RespondActivityTaskCompleted(ctx, request) + if resp == nil && err == nil { + return &workflowservice.RespondActivityTaskCompletedResponse{}, err + } + return resp, err +} + +// RespondActivityTaskCompletedByID is called by application worker when it is done processing an ActivityTask. +// It will result in a new 'ActivityTaskCompleted' event being written to the workflow history and a new DecisionTask +// created for the workflow so new decisions could be made. Similar to RespondActivityTaskCompleted but use Domain, +// WorkflowID and ActivityID instead of 'taskToken' for completion. It fails with 'EntityNotExistsError' +// if the these IDs are not valid anymore due to activity timeout. +func (wh *WorkflowNilCheckHandler) RespondActivityTaskCompletedByID(ctx context.Context, request *workflowservice.RespondActivityTaskCompletedByIDRequest) (_ *workflowservice.RespondActivityTaskCompletedByIDResponse, retError error) { + resp, err := wh.parentHandler.RespondActivityTaskCompletedByID(ctx, request) + if resp == nil && err == nil { + return &workflowservice.RespondActivityTaskCompletedByIDResponse{}, err + } + return resp, err +} + +// RespondActivityTaskFailed is called by application worker when it is done processing an ActivityTask. It will +// result in a new 'ActivityTaskFailed' event being written to the workflow history and a new DecisionTask +// created for the workflow instance so new decisions could be made. Use the 'taskToken' provided as response of +// PollForActivityTask API call for completion. It fails with 'EntityNotExistsError' if the taskToken is not valid +// anymore due to activity timeout. +func (wh *WorkflowNilCheckHandler) RespondActivityTaskFailed(ctx context.Context, request *workflowservice.RespondActivityTaskFailedRequest) (_ *workflowservice.RespondActivityTaskFailedResponse, retError error) { + resp, err := wh.parentHandler.RespondActivityTaskFailed(ctx, request) + if resp == nil && err == nil { + return &workflowservice.RespondActivityTaskFailedResponse{}, err + } + return resp, err +} + +// RespondActivityTaskFailedByID is called by application worker when it is done processing an ActivityTask. +// It will result in a new 'ActivityTaskFailed' event being written to the workflow history and a new DecisionTask +// created for the workflow instance so new decisions could be made. Similar to RespondActivityTaskFailed but use +// Domain, WorkflowID and ActivityID instead of 'taskToken' for completion. It fails with 'EntityNotExistsError' +// if the these IDs are not valid anymore due to activity timeout. +func (wh *WorkflowNilCheckHandler) RespondActivityTaskFailedByID(ctx context.Context, request *workflowservice.RespondActivityTaskFailedByIDRequest) (_ *workflowservice.RespondActivityTaskFailedByIDResponse, retError error) { + resp, err := wh.parentHandler.RespondActivityTaskFailedByID(ctx, request) + if resp == nil && err == nil { + return &workflowservice.RespondActivityTaskFailedByIDResponse{}, err + } + return resp, err +} + +// RespondActivityTaskCanceled is called by application worker when it is successfully canceled an ActivityTask. It will +// result in a new 'ActivityTaskCanceled' event being written to the workflow history and a new DecisionTask +// created for the workflow instance so new decisions could be made. Use the 'taskToken' provided as response of +// PollForActivityTask API call for completion. It fails with 'EntityNotExistsError' if the taskToken is not valid +// anymore due to activity timeout. +func (wh *WorkflowNilCheckHandler) RespondActivityTaskCanceled(ctx context.Context, request *workflowservice.RespondActivityTaskCanceledRequest) (_ *workflowservice.RespondActivityTaskCanceledResponse, retError error) { + resp, err := wh.parentHandler.RespondActivityTaskCanceled(ctx, request) + if resp == nil && err == nil { + return &workflowservice.RespondActivityTaskCanceledResponse{}, err + } + return resp, err +} + +// RespondActivityTaskCanceledByID is called by application worker when it is successfully canceled an ActivityTask. +// It will result in a new 'ActivityTaskCanceled' event being written to the workflow history and a new DecisionTask +// created for the workflow instance so new decisions could be made. Similar to RespondActivityTaskCanceled but use +// Domain, WorkflowID and ActivityID instead of 'taskToken' for completion. It fails with 'EntityNotExistsError' +// if the these IDs are not valid anymore due to activity timeout. +func (wh *WorkflowNilCheckHandler) RespondActivityTaskCanceledByID(ctx context.Context, request *workflowservice.RespondActivityTaskCanceledByIDRequest) (_ *workflowservice.RespondActivityTaskCanceledByIDResponse, retError error) { + resp, err := wh.parentHandler.RespondActivityTaskCanceledByID(ctx, request) + if resp == nil && err == nil { + return &workflowservice.RespondActivityTaskCanceledByIDResponse{}, err + } + return resp, err +} + +// RequestCancelWorkflowExecution is called by application worker when it wants to request cancellation of a workflow instance. +// It will result in a new 'WorkflowExecutionCancelRequested' event being written to the workflow history and a new DecisionTask +// created for the workflow instance so new decisions could be made. It fails with 'EntityNotExistsError' if the workflow is not valid +// anymore due to completion or doesn't exist. +func (wh *WorkflowNilCheckHandler) RequestCancelWorkflowExecution(ctx context.Context, request *workflowservice.RequestCancelWorkflowExecutionRequest) (_ *workflowservice.RequestCancelWorkflowExecutionResponse, retError error) { + resp, err := wh.parentHandler.RequestCancelWorkflowExecution(ctx, request) + if resp == nil && err == nil { + return &workflowservice.RequestCancelWorkflowExecutionResponse{}, err + } + return resp, err +} + +// SignalWorkflowExecution is used to send a signal event to running workflow execution. This results in +// WorkflowExecutionSignaled event recorded in the history and a decision task being created for the execution. +func (wh *WorkflowNilCheckHandler) SignalWorkflowExecution(ctx context.Context, request *workflowservice.SignalWorkflowExecutionRequest) (_ *workflowservice.SignalWorkflowExecutionResponse, retError error) { + resp, err := wh.parentHandler.SignalWorkflowExecution(ctx, request) + if resp == nil && err == nil { + return &workflowservice.SignalWorkflowExecutionResponse{}, err + } + return resp, err +} + +// SignalWithStartWorkflowExecution is used to ensure sending signal to a workflow. +// If the workflow is running, this results in WorkflowExecutionSignaled event being recorded in the history +// and a decision task being created for the execution. +// If the workflow is not running or not found, this results in WorkflowExecutionStarted and WorkflowExecutionSignaled +// events being recorded in history, and a decision task being created for the execution +func (wh *WorkflowNilCheckHandler) SignalWithStartWorkflowExecution(ctx context.Context, request *workflowservice.SignalWithStartWorkflowExecutionRequest) (_ *workflowservice.SignalWithStartWorkflowExecutionResponse, retError error) { + resp, err := wh.parentHandler.SignalWithStartWorkflowExecution(ctx, request) + if resp == nil && err == nil { + return &workflowservice.SignalWithStartWorkflowExecutionResponse{}, err + } + return resp, err +} + +// ResetWorkflowExecution reset an existing workflow execution to DecisionTaskCompleted event(exclusive). +// And it will immediately terminating the current execution instance. +func (wh *WorkflowNilCheckHandler) ResetWorkflowExecution(ctx context.Context, request *workflowservice.ResetWorkflowExecutionRequest) (_ *workflowservice.ResetWorkflowExecutionResponse, retError error) { + resp, err := wh.parentHandler.ResetWorkflowExecution(ctx, request) + if resp == nil && err == nil { + return &workflowservice.ResetWorkflowExecutionResponse{}, err + } + return resp, err +} + +// TerminateWorkflowExecution terminates an existing workflow execution by recording WorkflowExecutionTerminated event +// in the history and immediately terminating the execution instance. +func (wh *WorkflowNilCheckHandler) TerminateWorkflowExecution(ctx context.Context, request *workflowservice.TerminateWorkflowExecutionRequest) (_ *workflowservice.TerminateWorkflowExecutionResponse, retError error) { + resp, err := wh.parentHandler.TerminateWorkflowExecution(ctx, request) + if resp == nil && err == nil { + return &workflowservice.TerminateWorkflowExecutionResponse{}, err + } + return resp, err +} + +// ListOpenWorkflowExecutions is a visibility API to list the open executions in a specific domain. +func (wh *WorkflowNilCheckHandler) ListOpenWorkflowExecutions(ctx context.Context, request *workflowservice.ListOpenWorkflowExecutionsRequest) (_ *workflowservice.ListOpenWorkflowExecutionsResponse, retError error) { + resp, err := wh.parentHandler.ListOpenWorkflowExecutions(ctx, request) + if resp == nil && err == nil { + return &workflowservice.ListOpenWorkflowExecutionsResponse{}, err + } + return resp, err +} + +// ListClosedWorkflowExecutions is a visibility API to list the closed executions in a specific domain. +func (wh *WorkflowNilCheckHandler) ListClosedWorkflowExecutions(ctx context.Context, request *workflowservice.ListClosedWorkflowExecutionsRequest) (_ *workflowservice.ListClosedWorkflowExecutionsResponse, retError error) { + resp, err := wh.parentHandler.ListClosedWorkflowExecutions(ctx, request) + if resp == nil && err == nil { + return &workflowservice.ListClosedWorkflowExecutionsResponse{}, err + } + return resp, err +} + +// ListWorkflowExecutions is a visibility API to list workflow executions in a specific domain. +func (wh *WorkflowNilCheckHandler) ListWorkflowExecutions(ctx context.Context, request *workflowservice.ListWorkflowExecutionsRequest) (_ *workflowservice.ListWorkflowExecutionsResponse, retError error) { + resp, err := wh.parentHandler.ListWorkflowExecutions(ctx, request) + if resp == nil && err == nil { + return &workflowservice.ListWorkflowExecutionsResponse{}, err + } + return resp, err +} + +// ListArchivedWorkflowExecutions is a visibility API to list archived workflow executions in a specific domain. +func (wh *WorkflowNilCheckHandler) ListArchivedWorkflowExecutions(ctx context.Context, request *workflowservice.ListArchivedWorkflowExecutionsRequest) (_ *workflowservice.ListArchivedWorkflowExecutionsResponse, retError error) { + resp, err := wh.parentHandler.ListArchivedWorkflowExecutions(ctx, request) + if resp == nil && err == nil { + return &workflowservice.ListArchivedWorkflowExecutionsResponse{}, err + } + return resp, err +} + +// ScanWorkflowExecutions is a visibility API to list large amount of workflow executions in a specific domain without order. +func (wh *WorkflowNilCheckHandler) ScanWorkflowExecutions(ctx context.Context, request *workflowservice.ScanWorkflowExecutionsRequest) (_ *workflowservice.ScanWorkflowExecutionsResponse, retError error) { + resp, err := wh.parentHandler.ScanWorkflowExecutions(ctx, request) + if resp == nil && err == nil { + return &workflowservice.ScanWorkflowExecutionsResponse{}, err + } + return resp, err +} + +// CountWorkflowExecutions is a visibility API to count of workflow executions in a specific domain. +func (wh *WorkflowNilCheckHandler) CountWorkflowExecutions(ctx context.Context, request *workflowservice.CountWorkflowExecutionsRequest) (_ *workflowservice.CountWorkflowExecutionsResponse, retError error) { + resp, err := wh.parentHandler.CountWorkflowExecutions(ctx, request) + if resp == nil && err == nil { + return &workflowservice.CountWorkflowExecutionsResponse{}, err + } + return resp, err +} + +// GetSearchAttributes is a visibility API to get all legal keys that could be used in list APIs +func (wh *WorkflowNilCheckHandler) GetSearchAttributes(ctx context.Context, request *workflowservice.GetSearchAttributesRequest) (_ *workflowservice.GetSearchAttributesResponse, retError error) { + resp, err := wh.parentHandler.GetSearchAttributes(ctx, request) + if resp == nil && err == nil { + return &workflowservice.GetSearchAttributesResponse{}, err + } + return resp, err +} + +// RespondQueryTaskCompleted is called by application worker to complete a QueryTask (which is a DecisionTask for query) +// as a result of 'PollForDecisionTask' API call. Completing a QueryTask will unblock the client call to 'QueryWorkflow' +// API and return the query result to client as a response to 'QueryWorkflow' API call. +func (wh *WorkflowNilCheckHandler) RespondQueryTaskCompleted(ctx context.Context, request *workflowservice.RespondQueryTaskCompletedRequest) (_ *workflowservice.RespondQueryTaskCompletedResponse, retError error) { + resp, err := wh.parentHandler.RespondQueryTaskCompleted(ctx, request) + if resp == nil && err == nil { + return &workflowservice.RespondQueryTaskCompletedResponse{}, err + } + return resp, err +} + +// ResetStickyTaskList resets the sticky tasklist related information in mutable state of a given workflow. +// Things cleared are: +// 1. StickyTaskList +// 2. StickyScheduleToStartTimeout +// 3. ClientLibraryVersion +// 4. ClientFeatureVersion +// 5. ClientImpl +func (wh *WorkflowNilCheckHandler) ResetStickyTaskList(ctx context.Context, request *workflowservice.ResetStickyTaskListRequest) (_ *workflowservice.ResetStickyTaskListResponse, retError error) { + resp, err := wh.parentHandler.ResetStickyTaskList(ctx, request) + if resp == nil && err == nil { + return &workflowservice.ResetStickyTaskListResponse{}, err + } + return resp, err +} + +// QueryWorkflow returns query result for a specified workflow execution +func (wh *WorkflowNilCheckHandler) QueryWorkflow(ctx context.Context, request *workflowservice.QueryWorkflowRequest) (_ *workflowservice.QueryWorkflowResponse, retError error) { + resp, err := wh.parentHandler.QueryWorkflow(ctx, request) + if resp == nil && err == nil { + return &workflowservice.QueryWorkflowResponse{}, err + } + return resp, err +} + +// DescribeWorkflowExecution returns information about the specified workflow execution. +func (wh *WorkflowNilCheckHandler) DescribeWorkflowExecution(ctx context.Context, request *workflowservice.DescribeWorkflowExecutionRequest) (_ *workflowservice.DescribeWorkflowExecutionResponse, retError error) { + resp, err := wh.parentHandler.DescribeWorkflowExecution(ctx, request) + if resp == nil && err == nil { + return &workflowservice.DescribeWorkflowExecutionResponse{}, err + } + return resp, err +} + +// DescribeTaskList returns information about the target tasklist, right now this API returns the +// pollers which polled this tasklist in last few minutes. +func (wh *WorkflowNilCheckHandler) DescribeTaskList(ctx context.Context, request *workflowservice.DescribeTaskListRequest) (_ *workflowservice.DescribeTaskListResponse, retError error) { + resp, err := wh.parentHandler.DescribeTaskList(ctx, request) + if resp == nil && err == nil { + return &workflowservice.DescribeTaskListResponse{}, err + } + return resp, err +} + +// GetWorkflowExecutionRawHistory retrieves raw history directly from DB layer. +func (wh *WorkflowNilCheckHandler) GetWorkflowExecutionRawHistory(ctx context.Context, request *workflowservice.GetWorkflowExecutionRawHistoryRequest) (_ *workflowservice.GetWorkflowExecutionRawHistoryResponse, retError error) { + resp, err := wh.parentHandler.GetWorkflowExecutionRawHistory(ctx, request) + if resp == nil && err == nil { + return &workflowservice.GetWorkflowExecutionRawHistoryResponse{}, err + } + return resp, err +} + +// GetClusterInfo ... +func (wh *WorkflowNilCheckHandler) GetClusterInfo(ctx context.Context, request *workflowservice.GetClusterInfoRequest) (_ *workflowservice.GetClusterInfoResponse, retError error) { + resp, err := wh.parentHandler.GetClusterInfo(ctx, request) + if resp == nil && err == nil { + return &workflowservice.GetClusterInfoResponse{}, err + } + return resp, err +} + +// ListTaskListPartitions ... +func (wh *WorkflowNilCheckHandler) ListTaskListPartitions(ctx context.Context, request *workflowservice.ListTaskListPartitionsRequest) (_ *workflowservice.ListTaskListPartitionsResponse, retError error) { + resp, err := wh.parentHandler.ListTaskListPartitions(ctx, request) + if resp == nil && err == nil { + return &workflowservice.ListTaskListPartitionsResponse{}, err + } + return resp, err +} diff --git a/service/history/decisionHandler.go b/service/history/decisionHandler.go index 84745c14cc8..cce6f400a2d 100644 --- a/service/history/decisionHandler.go +++ b/service/history/decisionHandler.go @@ -27,8 +27,6 @@ import ( "github.com/temporalio/temporal/common/client" - "go.uber.org/yarpc" - h "github.com/temporalio/temporal/.gen/go/history" workflow "github.com/temporalio/temporal/.gen/go/shared" "github.com/temporalio/temporal/common" @@ -282,10 +280,10 @@ func (handler *decisionHandlerImpl) handleDecisionTaskCompleted( RunId: common.StringPtr(token.RunID), } - call := yarpc.CallFromContext(ctx) - clientLibVersion := call.Header(common.LibraryVersionHeaderName) - clientFeatureVersion := call.Header(common.FeatureVersionHeaderName) - clientImpl := call.Header(common.ClientImplHeaderName) + headers := client.GetHeadersValue(ctx, common.LibraryVersionHeaderName, common.FeatureVersionHeaderName, common.ClientImplHeaderName) + clientLibVersion := headers[0] + clientFeatureVersion := headers[1] + clientImpl := headers[2] context, release, err := handler.historyCache.getOrCreateWorkflowExecution(ctx, domainID, workflowExecution) if err != nil { diff --git a/service/matching/matchingEngine.go b/service/matching/matchingEngine.go index c876ae99cca..ea604e69ee3 100644 --- a/service/matching/matchingEngine.go +++ b/service/matching/matchingEngine.go @@ -486,6 +486,7 @@ func (e *matchingEngineImpl) QueryWorkflow(ctx context.Context, queryRequest *m. if err := e.versionChecker.SupportsConsistentQuery( workerResponse.GetCompletedRequest().GetWorkerVersionInfo().GetImpl(), workerResponse.GetCompletedRequest().GetWorkerVersionInfo().GetFeatureVersion()); err != nil { + // TODO: this error is swallowed and client gets "deadline exceeded" instead. return nil, err } }