Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose workflow client from Activity #1783

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion activity/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ package activity

import (
"context"

"go.temporal.io/sdk/client"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am slightly concerned about this package dependency. We now will never be able to have any package that client depends on depend on the activity package. Maybe that's ok? I wonder if it'd be ok to just do just have GetClient return internal.Client instead. Thoughts?

"go.temporal.io/sdk/internal"
"go.temporal.io/sdk/internal/common/metrics"
"go.temporal.io/sdk/log"
Expand Down Expand Up @@ -108,3 +108,9 @@ func GetWorkerStopChannel(ctx context.Context) <-chan struct{} {
func IsActivity(ctx context.Context) bool {
return internal.IsActivity(ctx)
}

// GetClient returns a client that can be used to interact with the Temporal
// service from an activity.
func GetClient(ctx context.Context) client.Client {
return internal.GetClient(ctx)
}
7 changes: 7 additions & 0 deletions internal/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,13 @@ func RecordActivityHeartbeat(ctx context.Context, details ...interface{}) {
getActivityOutboundInterceptor(ctx).RecordHeartbeat(ctx, details...)
}

// GetClient returns a client that can be used to interact with the Temporal
// service from an activity.
func GetClient(ctx context.Context) Client {
activityEnv := getActivityEnv(ctx)
return activityEnv.serviceInvoker.GetClient(ClientOptions{Namespace: activityEnv.workflowNamespace})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, but you are not using the client , or client options the worker was started with. So if the worker client was configured with a data converter or interceptor I don't think this client will have it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you might need to just wire the client from worker constructor down to here.

}

// ServiceInvoker abstracts calls to the Temporal service from an activity implementation.
// Implement to unit test activities.
type ServiceInvoker interface {
Expand Down
9 changes: 9 additions & 0 deletions internal/activity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,3 +249,12 @@ func (s *activityTestSuite) TestIsActivity() {
ctx, _ = newActivityContext(context.Background(), nil, &activityEnvironment{workerStopChannel: ch})
s.True(IsActivity(ctx))
}

func (s *activityTestSuite) TestGetClient() {
ctx, cancel := context.WithCancel(context.Background())
invoker := newServiceInvoker([]byte("task-token"), "identity", s.service, metrics.NopHandler, cancel,
1*time.Second, make(chan struct{}), s.namespace)
ctx, _ = newActivityContext(ctx, nil, &activityEnvironment{serviceInvoker: invoker})
client := GetClient(ctx)
s.NotNil(client)
}
16 changes: 16 additions & 0 deletions test/activity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"context"
"errors"
"fmt"
"go.temporal.io/api/workflowservice/v1"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -435,3 +436,18 @@ func (a *Activities) register(worker worker.Worker) {
worker.RegisterActivityWithOptions(a.activities2, activity.RegisterOptions{Name: "Prefix_", DisableAlreadyRegisteredCheck: true})
worker.RegisterActivityWithOptions(a.InspectActivityInfo, activity.RegisterOptions{Name: "inspectActivityInfo"})
}

func (a *Activities) ClientFromActivity(ctx context.Context) error {
activityClient := activity.GetClient(ctx)
info := activity.GetInfo(ctx)
request := workflowservice.ListWorkflowExecutionsRequest{Namespace: info.WorkflowNamespace}
resp, err := activityClient.ListWorkflow(ctx, &request)
if err != nil {
return err
}

if len(resp.Executions) == 0 {
return fmt.Errorf("expected non-empty list of executions")
}
return nil
}
5 changes: 5 additions & 0 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6633,6 +6633,11 @@ func (ts *IntegrationTestSuite) TestAwaitWithOptionsTimeout() {
ts.Equal("await-timer", str)
}

func (ts *IntegrationTestSuite) TestClientFromActivity() {
err := ts.executeWorkflow("client-from-activity", ts.workflows.WorkflowClientFromActivity, nil)
ts.NoError(err)
}

// executeWorkflow executes a given workflow and waits for the result
func (ts *IntegrationTestSuite) executeWorkflow(
wfID string, wfFunc interface{}, retValPtr interface{}, args ...interface{},
Expand Down
12 changes: 12 additions & 0 deletions test/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3249,6 +3249,17 @@ func (w *Workflows) SelectorBlockSignal(ctx workflow.Context) (string, error) {
return hello, nil
}

func (w *Workflows) WorkflowClientFromActivity(ctx workflow.Context) error {
ctx = workflow.WithActivityOptions(ctx, w.defaultActivityOptions())
var activities *Activities
var result string
err := workflow.ExecuteActivity(ctx, activities.ClientFromActivity).Get(ctx, &result)
if err != nil {
return err
}
return nil
}

func (w *Workflows) register(worker worker.Worker) {
worker.RegisterWorkflow(w.ActivityCancelRepro)
worker.RegisterWorkflow(w.ActivityCompletionUsingID)
Expand Down Expand Up @@ -3386,6 +3397,7 @@ func (w *Workflows) register(worker worker.Worker) {
worker.RegisterWorkflow(w.Echo)
worker.RegisterWorkflow(w.RunsLocalAndNonlocalActsWithRetries)
worker.RegisterWorkflow(w.SelectorBlockSignal)
worker.RegisterWorkflow(w.WorkflowClientFromActivity)
}

func (w *Workflows) defaultActivityOptions() workflow.ActivityOptions {
Expand Down
Loading