Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(command): refactor messaging handler #4204

Merged
merged 2 commits into from
Oct 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 23 additions & 14 deletions internal/core/command/controller/messaging/external.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/edgexfoundry/go-mod-bootstrap/v2/di"
"github.com/edgexfoundry/go-mod-core-contracts/v2/clients/logger"
"github.com/edgexfoundry/go-mod-core-contracts/v2/common"

"github.com/edgexfoundry/go-mod-messaging/v2/pkg/types"

"github.com/edgexfoundry/edgex-go/internal/core/command/container"
Expand All @@ -36,24 +35,24 @@ func OnConnectHandler(router MessagingRouter, dic *di.Container) mqtt.OnConnectH
config := container.ConfigurationFrom(dic.Get)
externalTopics := config.MessageQueue.External.Topics
qos := config.MessageQueue.External.QoS
retain := config.MessageQueue.External.Retain

requestQueryTopic := externalTopics[QueryRequestTopic]
responseQueryTopic := externalTopics[QueryResponseTopic]
if token := client.Subscribe(requestQueryTopic, qos, commandQueryHandler(responseQueryTopic, qos, retain, dic)); token.Wait() && token.Error() != nil {
lc.Errorf("could not subscribe to topic '%s': %s", responseQueryTopic, token.Error().Error())
return
if token := client.Subscribe(requestQueryTopic, qos, commandQueryHandler(dic)); token.Wait() && token.Error() != nil {
lc.Errorf("could not subscribe to topic '%s': %s", requestQueryTopic, token.Error().Error())
} else {
lc.Debugf("Subscribed to topic '%s' on external MQTT broker", requestQueryTopic)
}

requestCommandTopic := externalTopics[CommandRequestTopic]
if token := client.Subscribe(requestCommandTopic, qos, commandRequestHandler(router, dic)); token.Wait() && token.Error() != nil {
lc.Errorf("could not subscribe to topic '%s': %s", responseQueryTopic, token.Error().Error())
return
lc.Errorf("could not subscribe to topic '%s': %s", requestCommandTopic, token.Error().Error())
} else {
lc.Debugf("Subscribed to topic '%s' on external MQTT broker", requestCommandTopic)
}
}
}

func commandQueryHandler(responseTopic string, qos byte, retain bool, dic *di.Container) mqtt.MessageHandler {
func commandQueryHandler(dic *di.Container) mqtt.MessageHandler {
return func(client mqtt.Client, message mqtt.Message) {
lc := bootstrapContainer.LoggingClientFrom(dic.Get)
lc.Debugf("Received command query request from external message queue on topic '%s' with %d bytes", message.Topic(), len(message.Payload()))
Expand All @@ -65,19 +64,29 @@ func commandQueryHandler(responseTopic string, qos byte, retain bool, dic *di.Co
return
}

// example topic scheme: edgex/commandquery/request/<device>
messageBusInfo := container.ConfigurationFrom(dic.Get).MessageQueue
responseTopic := messageBusInfo.External.Topics[QueryResponseTopic]
if responseTopic == "" {
lc.Error("QueryResponseTopic not provided in External.Topics")
lc.Warn("Not publishing error message back due to insufficient information on response topic")
return
}

// example topic scheme: edgex/commandquery/request/<device-name>
// deviceName is expected to be at last topic level.
topicLevels := strings.Split(message.Topic(), "/")
deviceName := topicLevels[len(topicLevels)-1]
if strings.EqualFold(deviceName, common.All) {
deviceName = common.All
}

responseEnvelope, edgexErr := getCommandQueryResponseEnvelope(requestEnvelope, deviceName, dic)
if edgexErr != nil {
responseEnvelope = types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, edgexErr.Error())
responseEnvelope, err := getCommandQueryResponseEnvelope(requestEnvelope, deviceName, dic)
if err != nil {
responseEnvelope = types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, err.Error())
}

qos := messageBusInfo.External.QoS
retain := messageBusInfo.External.Retain
publishMessage(client, responseTopic, qos, retain, responseEnvelope, lc)
}
}
Expand Down Expand Up @@ -120,7 +129,7 @@ func commandRequestHandler(router MessagingRouter, dic *di.Container) mqtt.Messa
deviceRequestTopic, err := validateRequestTopic(messageBusInfo.Internal.Topics[DeviceRequestTopicPrefix], deviceName, commandName, method, dic)
if err != nil {
lc.Errorf("invalid request topic: %s", err.Error())
responseEnvelope := types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, "nil Device Client")
responseEnvelope := types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, err.Error())
publishMessage(client, externalResponseTopic, qos, retain, responseEnvelope, lc)
return
}
Expand Down
24 changes: 18 additions & 6 deletions internal/core/command/controller/messaging/external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,22 @@ import (
"strings"
"testing"

bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/container"
bootstrapConfig "github.com/edgexfoundry/go-mod-bootstrap/v2/config"
"github.com/edgexfoundry/go-mod-bootstrap/v2/di"
clientMocks "github.com/edgexfoundry/go-mod-core-contracts/v2/clients/interfaces/mocks"
lcMocks "github.com/edgexfoundry/go-mod-core-contracts/v2/clients/logger/mocks"
"github.com/edgexfoundry/go-mod-core-contracts/v2/common"
"github.com/edgexfoundry/go-mod-core-contracts/v2/dtos"
commonDTO "github.com/edgexfoundry/go-mod-core-contracts/v2/dtos/common"
"github.com/edgexfoundry/go-mod-core-contracts/v2/dtos/responses"
edgexErr "github.com/edgexfoundry/go-mod-core-contracts/v2/errors"
internalMessagingMocks "github.com/edgexfoundry/go-mod-messaging/v2/messaging/mocks"
"github.com/edgexfoundry/go-mod-messaging/v2/pkg/types"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/container"
bootstrapConfig "github.com/edgexfoundry/go-mod-bootstrap/v2/config"
"github.com/edgexfoundry/go-mod-bootstrap/v2/di"
internalMessagingMocks "github.com/edgexfoundry/go-mod-messaging/v2/messaging/mocks"
"github.com/edgexfoundry/go-mod-messaging/v2/pkg/types"

"github.com/edgexfoundry/edgex-go/internal/core/command/config"
"github.com/edgexfoundry/edgex-go/internal/core/command/container"
"github.com/edgexfoundry/edgex-go/internal/core/command/controller/messaging/mocks"
Expand Down Expand Up @@ -60,6 +61,7 @@ func TestOnConnectHandler(t *testing.T) {
mockRouter := &mocks.MessagingRouter{}
lc := &lcMocks.LoggingClient{}
lc.On("Errorf", mock.Anything, mock.Anything, mock.Anything).Return(nil)
lc.On("Debugf", mock.Anything, mock.Anything).Return(nil)
dic := di.NewContainer(di.ServiceConstructorMap{
container.ConfigurationName: func(get di.Get) interface{} {
return &config.ConfigurationStruct{
Expand Down Expand Up @@ -169,6 +171,16 @@ func Test_commandQueryHandler(t *testing.T) {
Port: mockPort,
MaxResultCount: 20,
},
MessageQueue: config.MessageQueue{
Internal: bootstrapConfig.MessageBusInfo{},
External: bootstrapConfig.ExternalMQTTInfo{
QoS: 0,
Retain: true,
Topics: map[string]string{
QueryResponseTopic: testQueryResponseTopic,
},
},
},
}
},
bootstrapContainer.LoggingClientInterfaceName: func(get di.Get) interface{} {
Expand Down Expand Up @@ -216,7 +228,7 @@ func Test_commandQueryHandler(t *testing.T) {
mqttClient := &mocks.Client{}
mqttClient.On("Publish", testQueryResponseTopic, byte(0), true, mock.Anything).Return(token)

fn := commandQueryHandler(testQueryResponseTopic, 0, true, dic)
fn := commandQueryHandler(dic)
fn(mqttClient, message)
if tt.expectedError {
if tt.expectedPublishError {
Expand Down
21 changes: 11 additions & 10 deletions internal/core/command/controller/messaging/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ import (
"context"
"strings"

bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/container"
"github.com/edgexfoundry/go-mod-bootstrap/v2/di"
"github.com/edgexfoundry/go-mod-core-contracts/v2/common"
"github.com/edgexfoundry/go-mod-core-contracts/v2/errors"

bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/container"
"github.com/edgexfoundry/go-mod-bootstrap/v2/di"

"github.com/edgexfoundry/go-mod-messaging/v2/pkg/types"

"github.com/edgexfoundry/edgex-go/internal/core/command/container"
Expand Down Expand Up @@ -52,7 +53,7 @@ func SubscribeCommandResponses(ctx context.Context, router MessagingRouter, dic
case err = <-messageErrors:
lc.Error(err.Error())
case msgEnvelope := <-messages:
lc.Debugf("Command response received on internal MessageBus. Topic: %s, Correlation-id: %s ", msgEnvelope.ReceivedTopic, msgEnvelope.CorrelationID)
lc.Debugf("Command response received on internal MessageBus. Topic: %s, Correlation-id: %s", msgEnvelope.ReceivedTopic, msgEnvelope.CorrelationID)

responseTopic, external, err := router.ResponseTopic(msgEnvelope.RequestID)
if err != nil {
Expand All @@ -72,7 +73,7 @@ func SubscribeCommandResponses(ctx context.Context, router MessagingRouter, dic
lc.Errorf("Could not publish to internal MessageBus topic '%s': %s", responseTopic, err.Error())
continue
}
lc.Debugf("Published response message to internal MessageBus on topic '%s'", responseTopic)
lc.Debugf("Command response sent to internal MessageBus. Topic: %s, Correlation-id: %s", responseTopic, msgEnvelope.CorrelationID)
}
}
}()
Expand Down Expand Up @@ -111,7 +112,7 @@ func SubscribeCommandRequests(ctx context.Context, router MessagingRouter, dic *
case err = <-messageErrors:
lc.Error(err.Error())
case requestEnvelope := <-messages:
lc.Debugf("Command request received on internal MessageBus. Topic: %s, Correlation-id: %s ", requestEnvelope.ReceivedTopic, requestEnvelope.CorrelationID)
lc.Debugf("Command request received on internal MessageBus. Topic: %s, Correlation-id: %s", requestEnvelope.ReceivedTopic, requestEnvelope.CorrelationID)

topicLevels := strings.Split(requestEnvelope.ReceivedTopic, "/")
length := len(topicLevels)
Expand Down Expand Up @@ -192,7 +193,7 @@ func SubscribeCommandQueryRequests(ctx context.Context, dic *di.Container) error
case err = <-messageErrors:
lc.Error(err.Error())
case requestEnvelope := <-messages:
lc.Debugf("Command query request received on internal MessageBus. Topic: %s, Correlation-id: %s ", requestEnvelope.ReceivedTopic, requestEnvelope.CorrelationID)
lc.Debugf("Command query request received on internal MessageBus. Topic: %s, Correlation-id: %s", requestEnvelope.ReceivedTopic, requestEnvelope.CorrelationID)

// example topic scheme: /commandquery/request/<device>
// deviceName is expected to be at last topic level.
Expand All @@ -202,10 +203,10 @@ func SubscribeCommandQueryRequests(ctx context.Context, dic *di.Container) error
deviceName = common.All
}

responseEnvelope, edgexErr := getCommandQueryResponseEnvelope(requestEnvelope, deviceName, dic)
if edgexErr != nil {
lc.Error(edgexErr.Error())
responseEnvelope = types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, edgexErr.Error())
responseEnvelope, err := getCommandQueryResponseEnvelope(requestEnvelope, deviceName, dic)
if err != nil {
lc.Error(err.Error())
responseEnvelope = types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, err.Error())
}

err = messageBus.Publish(responseEnvelope, internalQueryResponseTopic)
Expand Down
16 changes: 7 additions & 9 deletions internal/core/command/controller/messaging/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ import (
"github.com/edgexfoundry/go-mod-bootstrap/v2/di"
"github.com/edgexfoundry/go-mod-core-contracts/v2/common"
"github.com/edgexfoundry/go-mod-core-contracts/v2/dtos/responses"
edgexErr "github.com/edgexfoundry/go-mod-core-contracts/v2/errors"

"github.com/edgexfoundry/go-mod-messaging/v2/pkg/types"

"github.com/edgexfoundry/edgex-go/internal/core/command/application"
Expand Down Expand Up @@ -54,7 +52,7 @@ func validateRequestTopic(prefix string, deviceName string, commandName string,
}

// getCommandQueryResponseEnvelope returns the MessageEnvelope containing the DeviceCoreCommand payload bytes
func getCommandQueryResponseEnvelope(requestEnvelope types.MessageEnvelope, deviceName string, dic *di.Container) (types.MessageEnvelope, edgexErr.EdgeX) {
func getCommandQueryResponseEnvelope(requestEnvelope types.MessageEnvelope, deviceName string, dic *di.Container) (types.MessageEnvelope, error) {
var commandsResponse any
var err error

Expand All @@ -65,40 +63,40 @@ func getCommandQueryResponseEnvelope(requestEnvelope types.MessageEnvelope, devi
if offsetRaw, ok := requestEnvelope.QueryParams[common.Offset]; ok {
offset, err = strconv.Atoi(offsetRaw)
if err != nil {
return types.MessageEnvelope{}, edgexErr.NewCommonEdgeX(edgexErr.KindContractInvalid, fmt.Sprintf("Failed to convert 'offset' query parameter to intger: %s", err.Error()), err)
return types.MessageEnvelope{}, fmt.Errorf("failed to convert 'offset' query parameter to intger: %s", err.Error())
}
}
if limitRaw, ok := requestEnvelope.QueryParams[common.Limit]; ok {
limit, err = strconv.Atoi(limitRaw)
if err != nil {
return types.MessageEnvelope{}, edgexErr.NewCommonEdgeX(edgexErr.KindContractInvalid, fmt.Sprintf("Failed to convert 'limit' query parameter to integer: %s", err.Error()), err)
return types.MessageEnvelope{}, fmt.Errorf("failed to convert 'limit' query parameter to integer: %s", err.Error())
}
}
}

commands, totalCounts, edgexError := application.AllCommands(offset, limit, dic)
if edgexError != nil {
return types.MessageEnvelope{}, edgexErr.NewCommonEdgeX(edgexErr.KindServerError, fmt.Sprintf("Failed to get all commands: %s", edgexError.Error()), edgexError)
return types.MessageEnvelope{}, fmt.Errorf("failed to get all commands: %s", edgexError.Error())
}

commandsResponse = responses.NewMultiDeviceCoreCommandsResponse(requestEnvelope.RequestID, "", http.StatusOK, totalCounts, commands)
default:
commands, edgexError := application.CommandsByDeviceName(deviceName, dic)
if edgexError != nil {
return types.MessageEnvelope{}, edgexErr.NewCommonEdgeX(edgexErr.KindServerError, fmt.Sprintf("Failed to get commands by device name '%s': %s", deviceName, edgexError.Error()), edgexError)
return types.MessageEnvelope{}, fmt.Errorf("failed to get commands by device name '%s': %s", deviceName, edgexError.Error())
}

commandsResponse = responses.NewDeviceCoreCommandResponse(requestEnvelope.RequestID, "", http.StatusOK, commands)
}

responseBytes, err := json.Marshal(commandsResponse)
if err != nil {
return types.MessageEnvelope{}, edgexErr.NewCommonEdgeX(edgexErr.KindServerError, fmt.Sprintf("Failed to json encoding device commands payload: %s", err.Error()), err)
return types.MessageEnvelope{}, fmt.Errorf("failed to json encoding device commands payload: %s", err.Error())
}

responseEnvelope, err := types.NewMessageEnvelopeForResponse(responseBytes, requestEnvelope.RequestID, requestEnvelope.CorrelationID, common.ContentTypeJSON)
if err != nil {
return types.MessageEnvelope{}, edgexErr.NewCommonEdgeX(edgexErr.KindServerError, fmt.Sprintf("Failed to create response MessageEnvelope: %s", err.Error()), err)
return types.MessageEnvelope{}, fmt.Errorf("failed to create response MessageEnvelope: %s", err.Error())
}

return responseEnvelope, nil
Expand Down