Skip to content

Commit

Permalink
Add Nexus links tests
Browse files Browse the repository at this point in the history
  • Loading branch information
rodrigozhou committed Dec 19, 2024
1 parent ccb28ef commit 255cccf
Showing 1 changed file with 108 additions and 12 deletions.
120 changes: 108 additions & 12 deletions test/nexus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
nexuspb "go.temporal.io/api/nexus/v1"
"go.temporal.io/api/operatorservice/v1"
"go.temporal.io/api/serviceerror"
"google.golang.org/protobuf/proto"

"go.temporal.io/sdk/client"
"go.temporal.io/sdk/converter"
Expand Down Expand Up @@ -385,10 +386,22 @@ func TestNexusWorkflowRunOperation(t *testing.T) {

nc := tc.newNexusClient(t, service.Name)

link := &common.Link_WorkflowEvent{
Namespace: tc.testConfig.Namespace,
WorkflowId: "caller-wf-id",
RunId: "caller-run-id",
Reference: &common.Link_WorkflowEvent_EventRef{
EventRef: &common.Link_WorkflowEvent_EventReference{
EventType: enums.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED,
},
},
}

workflowID := "nexus-handler-workflow-" + uuid.NewString()
result, err := nexus.StartOperation(ctx, nc, workflowOp, workflowID, nexus.StartOperationOptions{
CallbackURL: "http://localhost/test",
CallbackHeader: nexus.Header{"test": "ok"},
Links: []nexus.Link{temporalnexus.ConvertLinkWorkflowEventToNexusLink(link)},
})
require.NoError(t, err)
require.NotNil(t, result.Pending)
Expand All @@ -403,6 +416,17 @@ func TestNexusWorkflowRunOperation(t *testing.T) {
require.Equal(t, "http://localhost/test", callback.Nexus.Url)
require.Subset(t, callback.Nexus.Header, map[string]string{"test": "ok"})

iter := tc.client.GetWorkflowHistory(ctx, workflowID, "", false, enums.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT)
for iter.HasNext() {
event, err := iter.Next()
require.NoError(t, err)
if event.GetEventType() == enums.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED {
require.Len(t, event.GetLinks(), 1)
require.True(t, proto.Equal(link, event.GetLinks()[0].GetWorkflowEvent()))
break
}
}

run := tc.client.GetWorkflow(ctx, workflowID, "")
require.NoError(t, handle.Cancel(ctx, nexus.CancelOperationOptions{}))
require.ErrorContains(t, run.Get(ctx, nil), "canceled")
Expand Down Expand Up @@ -545,19 +569,26 @@ func TestAsyncOperationFromWorkflow(t *testing.T) {
panic(fmt.Errorf("unexpected outcome: %s", action))
}
}
op := temporalnexus.NewWorkflowRunOperation("op", handlerWorkflow, func(ctx context.Context, action string, soo nexus.StartOperationOptions) (client.StartWorkflowOptions, error) {
require.NotPanicsf(t, func() {
temporalnexus.GetMetricsHandler(ctx)
temporalnexus.GetLogger(ctx)
}, "Failed to get metrics handler or logger from operation context.")
handlerWfID := ""
op := temporalnexus.NewWorkflowRunOperation(
"op",
handlerWorkflow,
func(ctx context.Context, action string, soo nexus.StartOperationOptions) (client.StartWorkflowOptions, error) {
require.NotPanicsf(t, func() {
temporalnexus.GetMetricsHandler(ctx)
temporalnexus.GetLogger(ctx)
}, "Failed to get metrics handler or logger from operation context.")

if action == "fail-to-start" {
return client.StartWorkflowOptions{}, nexus.HandlerErrorf(nexus.HandlerErrorTypeInternal, "fake internal error")
}
return client.StartWorkflowOptions{
ID: soo.RequestID,
}, nil
})
handlerWfID = ""
if action == "fail-to-start" {
return client.StartWorkflowOptions{}, nexus.HandlerErrorf(nexus.HandlerErrorTypeInternal, "fake internal error")
}
handlerWfID = soo.RequestID
return client.StartWorkflowOptions{
ID: soo.RequestID,
}, nil
},
)
callerWorkflow := func(ctx workflow.Context, action string) error {
c := workflow.NewNexusClient(tc.endpoint, "test")
ctx, cancel := workflow.WithCancel(ctx)
Expand Down Expand Up @@ -611,6 +642,71 @@ func TestAsyncOperationFromWorkflow(t *testing.T) {
}, callerWorkflow, "succeed")
require.NoError(t, err)
require.NoError(t, run.Get(ctx, nil))

// Check the link is added in the caller workflow.
iter := tc.client.GetWorkflowHistory(
ctx,
run.GetID(),
run.GetRunID(),
false,
enums.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT,
)
var targetEvent *historypb.HistoryEvent
for iter.HasNext() {
event, err := iter.Next()
require.NoError(t, err)
if event.GetEventType() == enums.EVENT_TYPE_NEXUS_OPERATION_STARTED {
targetEvent = event
break
}
}
require.NotNil(t, targetEvent)
require.Len(t, targetEvent.GetLinks(), 1)
link := targetEvent.GetLinks()[0]
require.Equal(t, tc.testConfig.Namespace, link.GetWorkflowEvent().GetNamespace())
require.Equal(t, handlerWfID, link.GetWorkflowEvent().GetWorkflowId())
require.NotEmpty(t, link.GetWorkflowEvent().GetRunId())
require.True(t, proto.Equal(
&common.Link_WorkflowEvent_EventReference{
EventType: enums.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED,
},
link.GetWorkflowEvent().GetEventRef(),
))
handlerRunID := link.GetWorkflowEvent().GetRunId()

// Check the link is added in the handler workflow.
iter = tc.client.GetWorkflowHistory(
ctx,
handlerWfID,
handlerRunID,
false,
enums.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT,
)
targetEvent = nil
for iter.HasNext() {
event, err := iter.Next()
require.NoError(t, err)
if event.GetEventType() == enums.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED {
targetEvent = event
break
}
}
require.NotNil(t, targetEvent)
require.Len(t, targetEvent.GetLinks(), 1)
require.True(t, proto.Equal(
&common.Link_WorkflowEvent{
Namespace: tc.testConfig.Namespace,
WorkflowId: run.GetID(),
RunId: run.GetRunID(),
Reference: &common.Link_WorkflowEvent_EventRef{
EventRef: &common.Link_WorkflowEvent_EventReference{
EventId: 5,
EventType: enums.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED,
},
},
},
targetEvent.GetLinks()[0].GetWorkflowEvent(),
))
})

t.Run("OpFailed", func(t *testing.T) {
Expand Down

0 comments on commit 255cccf

Please sign in to comment.