Skip to content

Commit

Permalink
Resolve workflow name alias in Nexus WorkflowRunOperation (#1766)
Browse files Browse the repository at this point in the history
* Resolve workflow name alias in Nexus WorkflowRunOperation

* ResolveWorkflowName as method
  • Loading branch information
bergundy authored Jan 8, 2025
1 parent 423a9a3 commit 4dd46b8
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 7 deletions.
4 changes: 4 additions & 0 deletions internal/internal_nexus_task_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type nexusTaskHandler struct {
failureConverter converter.FailureConverter
logger log.Logger
metricsHandler metrics.Handler
registry *registry
}

func newNexusTaskHandler(
Expand All @@ -83,6 +84,7 @@ func newNexusTaskHandler(
failureConverter converter.FailureConverter,
logger log.Logger,
metricsHandler metrics.Handler,
registry *registry,
) *nexusTaskHandler {
return &nexusTaskHandler{
nexusHandler: nexusHandler,
Expand All @@ -94,6 +96,7 @@ func newNexusTaskHandler(
taskQueueName: taskQueueName,
client: client,
metricsHandler: metricsHandler,
registry: registry,
}
}

Expand Down Expand Up @@ -393,6 +396,7 @@ func (h *nexusTaskHandler) newNexusOperationContext(response *workflowservice.Po
TaskQueue: h.taskQueueName,
MetricsHandler: metricsHandler,
Log: logger,
registry: h.registry,
}, nil
}

Expand Down
2 changes: 2 additions & 0 deletions internal/internal_nexus_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type nexusWorkerOptions struct {
client Client
workflowService workflowservice.WorkflowServiceClient
handler nexus.Handler
registry *registry
}

type nexusWorker struct {
Expand All @@ -57,6 +58,7 @@ func newNexusWorker(opts nexusWorkerOptions) (*nexusWorker, error) {
opts.executionParameters.FailureConverter,
opts.executionParameters.Logger,
opts.executionParameters.MetricsHandler,
opts.registry,
),
opts.workflowService,
params,
Expand Down
1 change: 1 addition & 0 deletions internal/internal_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1128,6 +1128,7 @@ func (aw *AggregatedWorker) start() error {
client: aw.client,
workflowService: aw.client.workflowService,
handler: handler,
registry: aw.registry,
})
if err != nil {
return fmt.Errorf("failed to create a nexus worker: %w", err)
Expand Down
1 change: 1 addition & 0 deletions internal/internal_workflow_testsuite.go
Original file line number Diff line number Diff line change
Expand Up @@ -2408,6 +2408,7 @@ func (env *testWorkflowEnvironmentImpl) newTestNexusTaskHandler(
env.failureConverter,
env.logger,
env.metricsHandler,
env.registry,
)
}

Expand Down
15 changes: 10 additions & 5 deletions internal/nexus_operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,16 @@ import (

// NexusOperationContext is an internal only struct that holds fields used by the temporalnexus functions.
type NexusOperationContext struct {
Client Client
Namespace string
TaskQueue string
MetricsHandler metrics.Handler
Log log.Logger
Client Client
Namespace string
TaskQueue string
MetricsHandler metrics.Handler
Log log.Logger
registry *registry
}

func (nc *NexusOperationContext) ResolveWorkflowName(wf any) (string, error) {
return getWorkflowFunctionName(nc.registry, wf)
}

type nexusOperationContextKeyType struct{}
Expand Down
8 changes: 7 additions & 1 deletion temporalnexus/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,12 @@ func ExecuteUntypedWorkflow[R any](
if !ok {
return nil, nexus.HandlerErrorf(nexus.HandlerErrorTypeInternal, "internal error")
}

workflowType, err := nctx.ResolveWorkflowName(workflow)
if err != nil {
panic(err)
}

if startWorkflowOptions.TaskQueue == "" {
startWorkflowOptions.TaskQueue = nctx.TaskQueue
}
Expand Down Expand Up @@ -373,7 +379,7 @@ func ExecuteUntypedWorkflow[R any](
}
internal.SetLinksOnStartWorkflowOptions(&startWorkflowOptions, links)

run, err := nctx.Client.ExecuteWorkflow(ctx, startWorkflowOptions, workflow, args...)
run, err := nctx.Client.ExecuteWorkflow(ctx, startWorkflowOptions, workflowType, args...)
if err != nil {
return nil, err
}
Expand Down
5 changes: 4 additions & 1 deletion test/nexus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ func TestAsyncOperationFromWorkflow(t *testing.T) {
service := nexus.NewService("test")
require.NoError(t, service.Register(op))
w.RegisterNexusService(service)
w.RegisterWorkflow(handlerWorkflow)
w.RegisterWorkflowWithOptions(handlerWorkflow, workflow.RegisterOptions{Name: "foo"})
w.RegisterWorkflow(callerWorkflow)
require.NoError(t, w.Start())
t.Cleanup(w.Stop)
Expand Down Expand Up @@ -693,6 +693,9 @@ func TestAsyncOperationFromWorkflow(t *testing.T) {
}
}
require.NotNil(t, targetEvent)
// Verify that calling by name works.
require.Equal(t, "foo", targetEvent.GetWorkflowExecutionStartedEventAttributes().WorkflowType.Name)
// Verify that links are properly attached.
require.Len(t, targetEvent.GetLinks(), 1)
require.True(t, proto.Equal(
&common.Link_WorkflowEvent{
Expand Down

0 comments on commit 4dd46b8

Please sign in to comment.