Skip to content

Integration Rules Message Processing

Eric Matelyan edited this page Nov 26, 2024 · 13 revisions

In the Integration Rules scenario, Vault Spark allows developers to use the Vault Java SDK to send and receive messages from one vault (the source) to another (the target), whilst transforming the message on route using integration rules.

This page covers how to receive the messages in the target vault and process then using integration rules.

The Vault Java SDK QueueService, RecordService, Integration , IntegrationRuleService, HttpService, and QueryService interfaces are used to implement this functionality.

For full details on the interfaces and methods used, please review the Javadocs.

Details of how to create the Spark Messages in the source vault are covered in the page Integration Rules Source Messages.

Code Logic

Once the vSDKSendWarrantyUserAction action adds a Message to the outbound queue, Spark sends the Message to the target vault's inbound queue where it is processed by a MessageProcessor Vault Java SDK class.

The MessageProcessor then processes the messages by retrieving the integration rules, making an HttpService callouts to the source vault to query for more information, transforming the field data and then creating a record in the target vault.

In this example, vSDKIntRuleMessageProcessor contains the intRulesETLWarrantiesMessage method to process the inbound queue messages.

Key Concepts - Target Vault

  • There are 3 classes, the processor class and two User Defined Classes (UDC):
  • As a Message comes in, if an existing Claims Warranty record exists in the target vault it will be updated, otherwise a new record will be created.
    • Run a QueryService query to check for unprocessed records in the Integration Transaction object on the Source Vault.
      • The Message contains the source record ID
      • Target Claims Warranty records have a source_record__c populated
    • Run an IntegrationRuleService to retrieve integration rules for the Connection using the getIntegrationRules method, which can be used to construct a VQL query to retrieve data from the source vault.
      • Create a map of incoming messages
      • Check to make sure none of the objects already exist in the Target Vault
      • The source query fields are retrieve using the getQueryFields method
      • Create a query from the query object/fields for the configured Integration Rule, i.e. "warranty_int_rule_1__c"
      • The source record IDs are added to the query
      • Set the records field values in the target vault using values in "fieldRuleResults"
      • Get the resulting values for a row based on the evaluation of the rule against a json query response record
      • Set any additional custom fields
        • Manually add the source query field period_in_months__c from the related object warranty_period__cr
    • Run an HttpService callout from target to source vault to query for more data using the query constructed from the IntegrationRuleService
      • Evaluate the query response to return the field rule results for the Integration Rules
        • For each field rule response, update target fields using the value from the getTargetField method
      • Also update any custom fields in the target record
        • Using the field period_in_months__c from the [related object] sub-query, combined with the start_date__c, calculate and update the target record's end_date__c
        • Update the target record's integration_status__c to successfully_processed__c
  • Only non-empty Message records are processed.

Message Processor and Inbound Queue

The messages received from the queue have to be identified and processed according to custom logic. In this sample project, we only want to process messages that are not empty and have an object attribute that equals vsdk_warranty__c.

If they have these attributes, the message is pushed into a custom intRulesETLWarrantiesMessage method that processes the data and performs custom logic that is specific to a new vsdk_warranty__c message.

@MessageProcessorInfo()
public class vSDKIntRuleMessageProcessor implements MessageProcessor {

    /**
     * Main Spark v2v MessaageProcessor. It determines the Integration Point
     * related to the message and call the appropriate Processing code
     * which uses a combination of Integration Rules and SDK Logic
     * @param context is the MessageProcessor context
     */

	public void execute(MessageContext context) {

        //Processes a message from the queue which aren't blank
    	Message incomingMessage = context.getMessage();
        if (!incomingMessage.getMessageItems().isEmpty()) {

            // Here we want to process messages specific to the different integration
            // points. These message attributes are set in the user action
            // "vSdkSendWarrantyUserAction" in the source Vault
            String integrationPointApiName =
                    incomingMessage.getAttribute("integration_point",
                                                 MessageAttributeValueType.STRING);

            switch(integrationPointApiName) {
                case "receive_warranty__c":
                    // Here we want to process the messages associated with the inbound
                    // Integration Point "Receive Warranty". This is done in the method
                    // "intRulesETLWarrantiesMessage"
                    intRulesETLWarrantiesMessage(incomingMessage, context);
                    break;
                    // Add further case statements here if you wish to handle further
                    // integration points
            }
        }
    }

Processing the Message Items

Once the code has entered into the intRulesETLWarrantiesMessage method, the message items are parsed through and then the source record IDs are used to find existing Claims Warranty records in the target vault.

The code uses the vSDKIntRulesHelper.getUnprocessedObjects() method to get all unprocessed items from the Source Vault Integration Transaction object using a query against the records in that object. The query uses the Status field in the Integration Transaction records to get all pending_c records.

  • If records are found, add them to a etlMessageMap Map<String,Record> that is used to update the target based on the integration rules.
	public static void intRulesETLWarrantiesMessage(Message message,
                                                    MessageContext context) {

        LogService logService = ServiceLocator.locate(LogService.class);
        List<String> incomingMessageList = VaultCollections.newList();
        String sourceVaultId = context.getRemoteVaultId();
        String connectionId = context.getConnectionId();
        String integrationPointApiName =
                message.getAttribute("integration_point", MessageAttributeValueType.STRING);

        StringBuilder logMessage = new StringBuilder();
        logMessage.append("Processing integrationPointApiName: ").append(integrationPointApiName);
        logService.info(logMessage.toString());

        String connectionName = "vsdk_connection_to_warranties";
        String sourceObject = "vsdk_warranty__c";

        // Retrieve all items in the Message and put them in a Map.
        // The map is used to determine if the code wasn't able to find existing records.
        for (String sourceRecordId : message.getMessageItems()) {
            incomingMessageList.add(sourceRecordId);
            logMessage = new StringBuilder();
            logMessage.append("Incoming message item: ").append(sourceRecordId);
            logService.info(logMessage.toString());
        }

        // Check for items which haven't been migrated
        // This uses a callback to the source vault for the object
        // then add them to the incoming message list for processing
        List<String> unprocessedList = vSDKIntRulesHelper.getUnprocessedObjects(
                connectionName,
                sourceObject,
                integrationPointApiName);
        incomingMessageList.addAll(unprocessedList);

        vSDKIntRulesETLWarranties.process(
                incomingMessageList,
                sourceVaultId,
                sourceObject,
                connectionId,
                connectionName,
                integrationPointApiName);
    }
    ...
}

Extract and Transform Records

Now that the etlMessageMap has been created and populated, we can initiate the process of extracting data from the source vault, and using the integration rules to transforming into into a Claims Warranty target vault record, which will later be saved to the target vault. The map contains the source record IDs as keys with the target record as the values.

  public static void process(List<String> incomingMessageList,
                               String sourceVaultId,
                               String sourceObject,
                               String connectionId,
                               String connectionName,
                               String integrationPointApiName) {
	...
        if (etlMessageMap.size() > 0) {

            v2vIntRulesETLWarrantiesFromMessageMap(etlMessageMap,
                    connectionId,
                    connectionName,
                    sourceObject,
                    destinationObject,
                    integrationApiName,
                    integrationPointApiName);
        }

	...
}

First we need to instantiate the Integration Rules Service. We then use the instantiated Integration Rules Service to get all Integration Rules for the Connection. See the newGetIntegrationRulesRequestBuilder Java Doc for additional options.

    private static void v2vIntRulesETLWarrantiesFromMessageMap(Map<String, Record> recordsToETL,
                                                               String connectionId,
                                                               String connectionName,
                                                               String sourceObject,
                                                               String destinationObject,
                                                               String integrationApiName,
                                                               String integrationPointApiName) {
        ...

        // Instantiate the Integration Rule Service
        IntegrationRuleService integrationRuleService
                = ServiceLocator.locate(IntegrationRuleService.class);

        // Get integration rules for this connection
        GetIntegrationRulesRequest getIntegrationRulesRequest
                = integrationRuleService.newGetIntegrationRulesRequestBuilder()
                .withConnectionId(connectionId).build();
        GetIntegrationRulesResponse getIntegrationRulesResponse
                = integrationRuleService.getIntegrationRules(getIntegrationRulesRequest);	
	...
}
Iterate through each Integration Rule

Next, we need to iterate through each Integration Rule and get some details from the Warranty details on the Source Vault. We will loop through these until the desired IntegrationRule object warranty_int_rule_1__c is found. This is the integration rule that was created via MDL during setup.

Using the integration rules a VQL query will be dynamically created to return the source data. First the source object is retrieved from the integration rule, using the getFieldRules().iterator().next().getQueryObject(), which is used in the VQL FROM clause. Since all the field rules in this example contain the same source object, only one object is retrieved.

Next a collection of source fields is then populated using the getQueryFields() method, which are used to create a VQL SELECT clause.

In this example source data from another related relationship object warranty_period__cr is also mapped. As there isn't a way to configure this via integration rules, this is manually added the the VQL SELECT clause.

Finally a VQL WHERE clause is added to bring back just the required source records.

The end VQL query is in the form of:

SELECT name__v, customer_name__c, cover_start_date__c, extended_cover__c, manufacturer__c, (SELECT name__v, period_in_months__c FROM warranty_period__cr) FROM vsdk_warranty__c where id contains CONTAINS ('source_ids')

    private static void v2vIntRulesETLWarrantiesFromMessageMap(Map<String, Record> recordsToETL,
                                                               String connectionId,
                                                               String connectionName,
                                                               String sourceObject,
                                                               String destinationObject,
                                                               String integrationApiName,
                                                               String integrationPointApiName) {
        ...

        for (Iterator<IntegrationRule> intRulesiterator = intRules.iterator();
             intRulesiterator.hasNext();) {
            IntegrationRule intRuleResult = intRulesiterator.next();
            String intRulePointName = intRuleResult.getIntegrationPoint();

            // Process the rule if the rule exists with a status of active.
            if (intRulePointName.equals(integrationPointApiName)) {

                intRuleExists = true;
                String queryObject
                        = intRuleResult.getFieldRules().iterator().next().getQueryObject();
                Collection<String> queryFields = intRuleResult.getQueryFields(queryObject);

                // Create a query from the query object/fields for the configured
                // Integration Rule, i.e. "warranty_int_rule_1__c"
                StringBuilder query = new StringBuilder();
                query.append("SELECT ");
                query.append(String.join(",", queryFields));
                query.append(", (SELECT name__v, period_in_months__c"); // Manually add an custom fields not
                                                                        // included in the integration rules
                query.append(" FROM warranty_period__cr)");
                query.append(" FROM ").append(queryObject);
                query.append(" WHERE id CONTAINS ('");
                query.append(String.join("','", recordsToETL.keySet())).append("')");
        ...

The value retrieved from the query will be used later to calculate values before storing them in a Claims Warranty record on the Target Vault.

Query Data from Source

Now a VQL query has been built from the integration rules, an HttpRequest is now setup and ready to be sent to the source vault. With the /api/{version}/query endpoint, we want to receive JSON data back so you send the request with the HttpResponseBodyValueType.JSONDATA parameter value.

When the HttpRequest response from the source vault is received, it can be parsed through using the JsonService class.

public static void v2vIntRulesExtractAndTransformWarranties(Map<String,Record> recordsToExtractAndTransform, String connectionId, String destinationObject) {
	...
            //The configured connection provides the full DNS name.
            //For the path, you only need to append the API endpoint after the DNS.
            //The query endpoint takes a POST where the BODY is the query itself.
            request.setMethod(HttpMethod.POST);
            request.appendPath("/api/v20.1/query");
            request.setHeader("Content-Type", "application/x-www-form-urlencoded");
            request.setHeader("X-VaultAPI-DescribeQuery", "true"); // *** New
            request.setBodyParam("q", query.toString());
	...
}
Map via Integration Rules

The response values are used to update the fields in the Claims Warranty record in the target vault via using the integration rules.

  • First, the response is parsed into a JsonData object
  • From the response, the getJsonObject() will get the response as a parseable JsonObject
    • Here the getValue method can be used to retrieve responseStatus, responseDetails, and data
  • The data element is an array of JSON data. This is parsed into a JsonArray object.
    • Each queried record is returned as an element of the array and must be parsed into a JsonObject.
    • evaluateFieldRules() then creates a collection of evaluated FieldRuleResults from the queried records.
    • Individual target fields can then be set from the evaluated FieldRuleResults.
  • Target fields can also be manually populated without the use of integration rules
    • The warranty_period__cr subquery is returned in the query data as a JsonObject from which the period_in_months__c can be retrieved.
    • The target records end_date__c field is set to the by adding the period_in_months__c minus a day to the start_date__c
    • The target records integration_status__c field is set to processing_successful__c.
private static void v2vIntRulesETLWarrantiesFromMessageMap(Map<String, Record> recordsToETL,
                                                               String connectionId,
                                                               String connectionName,
                                                               String sourceObject,
                                                               String destinationObject,
                                                               String integrationApiName,
                                                               String integrationPointApiName) {
	...
        httpService.sendRequest(request, HttpResponseBodyValueType.JSONDATA)
                .onSuccess(httpResponse -> {

                    JsonData response = httpResponse.getResponseBody();

                    if (response.isValidJson()) {
                        String responseStatus = response.getJsonObject().getValue("responseStatus", JsonValueType.STRING);

                        if (responseStatus.equals("SUCCESS")) {
                            logService.info("HTTP Query Request: SUCCESS");

                            JsonArray data = response.getJsonObject().getValue("data", JsonValueType.ARRAY);
                            JsonObject queryDescribe = response.getJsonObject().getValue("queryDescribe",
                                                                                         JsonValueType.OBJECT);

                            //Retrieve each record returned from the VQL query.
                            //Each element of the returned `data` JsonArray is a record with it's queried fields.
                            for (int i = 0; i < data.getSize();i++) {
                                Boolean fieldRuleErrorOccurred = false;

                                JsonObject queryRecord = data.getValue(i, JsonValueType.OBJECT);
                                String sourceId = queryRecord.getValue("id", JsonValueType.STRING);

                                // Get the resulting values for a row based on the evaluation of the rule 
                                // against a json query response record
                                logService.info("Evaluate field rules request for rule: " + intRuleResult.getName());
                                EvaluateFieldRulesRequest evaluateFieldRulesRequest
                                    = integrationRuleService.newEvaluateFieldRulesRequestBuilder()
                                      .withIntegrationRule(intRuleResult)
                                      .withTargetName(destinationObject)
                                      .withQueryDescribe(queryDescribe)
                                      .withQueryData(queryRecord)
                                      .build();
                                EvaluateFieldRulesResponse evaluateFieldRulesResponse
                                    = integrationRuleService.evaluateFieldRules(evaluateFieldRulesRequest);
                                Collection<FieldRuleResult> fieldRuleResults
                                    = evaluateFieldRulesResponse.getFieldRuleResults();

                                // Set the records field values in the target vault using values in "fieldRuleResults"
                                for (Iterator<FieldRuleResult> fieldRuleResultIterator
                                     = fieldRuleResults.iterator(); fieldRuleResultIterator.hasNext(); ) {
                                    FieldRuleResult fieldRuleResult = fieldRuleResultIterator.next();
                                    if (!fieldRuleResult.hasError()) {
                                        recordsToETL.get(sourceId).setValue(
                                        fieldRuleResult.getTargetField(), fieldRuleResult.getValue());
                                    } else {
                                        FieldRuleError error = fieldRuleResult.getError();

                                        vSDKIntRulesHelper.createUserExceptionMessage(
                                            integrationApiName,
                                            integrationPointApiName,
                                            error.getMessage(),
                                            "vSDKIntRulesETLWarranties",
                                            sourceId);

                                        // Do handling based on error type and message
                                        ItemErrorType errorType = error.getType();
                                        String errorMessage = error.getMessage();
                                        logService.info("Record field error type: " + errorType + "," +
                                                            " message: " + errorMessage);

                                        // Remove the failed record so it's not processed until the field
                                        // error is resolved
                                        recordsToETL.remove(sourceId);

                                        // Log the failed to update records
                                                    failedIdList.add(sourceId);
                                                    fieldRuleErrorOccurred = true;
                                                    break;
                                    }
                                }

                                if (!fieldRuleErrorOccurred) {

                                    //
                                    // Set any additional custom fields
                                    //

                                    // If desired query results can be used to calculate custom fields.
                                    // For instance the period_in_months__c field within the warranty_period__cr subquery
                                    // can be retrieved via a JsonObject then JsonArray.
                                    // This is basically an embedded JSON response.

                                    JsonObject subquery = queryRecord.getValue("warranty_period__cr",
                                                                               JsonValueType.OBJECT);
                                    JsonArray subqueryData = subquery.getValue("data", JsonValueType.ARRAY);
                                    JsonObject subqueryRecord = subqueryData.getValue(0, JsonValueType.OBJECT);
                                    Integer warrantyPeriodMonths = Integer.parseInt(subqueryRecord.getValue(
                                                        "period_in_months__c", JsonValueType.STRING));

                                    // Now calculate the end_date__c by adding the warranty period minus a
                                    // day to the start_date__c
                                    LocalDate startDate = recordsToETL.get(sourceId).getValue(
                                                "start_date__c", ValueType.DATE);
                                    recordsToETL.get(sourceId).setValue("end_date__c",
                                                startDate.plusMonths(warrantyPeriodMonths).minusDays(1));
  
                                    // List values can also be populated, such as the integration status in
                                    // this example.
                                    recordsToETL.get(sourceId).setValue("integration_status__c",
                                            VaultCollections.asList("processing_successful__c"));

                                    // Log the successfully updated records
                                    successfulIdList.add(sourceId);
                                }
                            }

                            data = null;
                        }
                        else {
                             for (String key : recordsToETL.keySet()) {
                                 failedIdList.add(key);
                             }
                        }
                        response = null;
                    }
               })
              .onError(httpOperationError -> {
                  for (String key : recordsToETL.keySet()) {
                      failedIdList.add(key);
                  }
              }).execute();
	...
}
Save the transformed records using batchSaveRecords

We then need to save the transformed records. In this case, to deal with potential volume we use the batchSaveRecord method from the RecordService.

First, we add all of the recordsToETL values to the newly created transformedRecordlist.

Next we define and run the batchSaveRecords using the transformedRecordList values and handle any errors. In this case we are using the UserExceptions Spark component to surface the errors to an administrative (non-technical) users. The UserExceptions are implemented as Object records, so the RecordService is used to create the UserException records.

            List<Record> transformedRecordList = VaultCollections.newList();
            if (recordsToETL.size() > 0) {
                transformedRecordList.addAll(recordsToETL.values());
                recordsToETL.clear();

                recordService.batchSaveRecords(transformedRecordList).onSuccesses(successMessage -> {

                    List<Record> successfulRecords = VaultCollections.newList();
                    successMessage.stream().forEach(positionalRecordId -> {
                        Record record = recordService.newRecordWithId(destinationObject,
                                positionalRecordId.getRecordId());
                        successfulRecords.add(record);
                    });

                }).onErrors(batchOperationErrors -> {
                    batchOperationErrors.stream().forEach(error -> {
                        String errMsg = error.getError().getMessage();
                        int errPosition = error.getInputPosition();
                        String sourceId = transformedRecordList.get(errPosition).getValue("source_record__c", 
                                                                                          ValueType.STRING);

                        // Create a User Exception per record error, as each one may be different
                        //
                        // An example of when this error might occur would be if a mandatory field in the
                        // target object wasn't populated.
                        StringBuilder logMessage = new StringBuilder();
                        logMessage.append("Unable to create '")
                                        .append(transformedRecordList.get(errPosition).getObjectName())
                                        .append("' record, because of '").append(errMsg).append("'.");

                        vSDKIntRulesHelper.createUserExceptionMessage(
                            integrationApiName,
                            integrationPointApiName,
                            logMessage.toString(),
                            "vSDKIntRulesETLWarranties",
                            sourceId);

                        // Move the failed record from the successful to failed list
                        successfulIdList.remove(sourceId);
                        failedIdList.add(sourceId);

                    });
                }).execute();
Final Cleanup

Lastly, we need to set the processed objects to processed_c status on the Source Vault

private static void v2vIntRulesETLWarrantiesFromMessageMap(Map<String, Record> recordsToETL,
                                                               String connectionId,
                                                               String connectionName,
                                                               String sourceObject,
                                                               String destinationObject,
                                                               String integrationApiName,
                                                               String integrationPointApiName) {
	...
        // Update the source system to say whether records have updated successfully or not,
        // by making a callback to the source vault and updating the integration transaction
        // record associated with the source record.
        vSDKIntRulesHelper.setIntTransProcessedStatuses(
                successfulIdList,
                connectionName, // i.e. vsdk_connection_to_warranties
                queryObject, // i.e. vsdk_warranty__c
                integrationPointApiName,
                true);

        vSDKIntRulesHelper.setIntTransProcessedStatuses(
                failedIdList,
                connectionName, // i.e. vsdk_connection_to_warranties
                queryObject, // i.e. vsdk_warranty__c
                integrationPointApiName,
                false);
	...
}