-
Notifications
You must be signed in to change notification settings - Fork 4
Integration Rules Message Processing
In the Integration Rules scenario, Vault Spark allows developers to use the Vault Java SDK to send and receive messages from one vault (the source) to another (the target), whilst transforming the message on route using integration rules.
This page covers how to receive the messages in the target vault and process then using integration rules.
The Vault Java SDK QueueService
, RecordService
, Integration
, IntegrationRuleService
, HttpService
, and QueryService
interfaces are used to implement this functionality.
For full details on the interfaces and methods used, please review the Javadocs.
Details of how to create the Spark Messages in the source vault are covered in the page Integration Rules Source Messages.
Once the vSDKSendWarrantyUserAction action adds a Message
to the outbound queue, Spark sends the Message
to the target vault's inbound queue where it is processed by a MessageProcessor
Vault Java SDK class.
The MessageProcessor
then processes the messages by retrieving the integration rules, making an HttpService
callouts to the source vault to query for more information, transforming the field data and then creating a record in the target vault.
In this example, vSDKIntRuleMessageProcessor contains the intRulesETLWarrantiesMessage
method to process the inbound queue messages.
- There are 3 classes, the processor class and two User Defined Classes (UDC):
- The vSDKIntRuleMessageProcessor - processes
Message
records received in the inbound queue -vsdk_warranty_in_queue__c
. - The vSDKIntRulesHelper provides helper methods to process objects from the Integration Transaction object on the Source Vault.
- The vSDKIntRulesETLWarranties process the transformation using Integration Rules.
- The vSDKIntRuleMessageProcessor - processes
- As a
Message
comes in, if an existing Claims Warranty record exists in the target vault it will be updated, otherwise a new record will be created.- Run a
QueryService
query to check for unprocessed records in the Integration Transaction object on the Source Vault.- The
Message
contains the source record ID - Target Claims Warranty records have a
source_record__c
populated
- The
- Run an
IntegrationRuleService
to retrieve integration rules for the Connection using the getIntegrationRules method, which can be used to construct a VQL query to retrieve data from the source vault.- Create a map of incoming messages
- Check to make sure none of the objects already exist in the Target Vault
- The source query fields are retrieve using the
getQueryFields
method - Create a query from the query object/fields for the configured Integration Rule, i.e. "warranty_int_rule_1__c"
- The source record IDs are added to the query
- Set the records field values in the target vault using values in "fieldRuleResults"
- Get the resulting values for a row based on the evaluation of the rule against a json query response record
- Set any additional custom fields
- Manually add the source query field
period_in_months__c
from the related objectwarranty_period__cr
- Manually add the source query field
- Run an
HttpService
callout from target to source vault to query for more data using the query constructed from theIntegrationRuleService
- Evaluate the query response to return the field rule results for the Integration Rules
- For each field rule response, update target fields using the value from the
getTargetField
method
- For each field rule response, update target fields using the value from the
- Also update any custom fields in the target record
- Using the field
period_in_months__c
from the [related object] sub-query, combined with thestart_date__c
, calculate and update the target record'send_date__c
- Update the target record's
integration_status__c
tosuccessfully_processed__c
- Using the field
- Evaluate the query response to return the field rule results for the Integration Rules
- Run a
- Only non-empty
Message
records are processed.
The messages received from the queue have to be identified and processed according to custom logic. In this sample project, we only want to process messages that are not empty and have an object
attribute that equals vsdk_warranty__c
.
If they have these attributes, the message is pushed into a custom intRulesETLWarrantiesMessage
method that processes the data and performs custom logic that is specific to a new vsdk_warranty__c
message.
@MessageProcessorInfo()
public class vSDKIntRuleMessageProcessor implements MessageProcessor {
/**
* Main Spark v2v MessaageProcessor. It determines the Integration Point
* related to the message and call the appropriate Processing code
* which uses a combination of Integration Rules and SDK Logic
* @param context is the MessageProcessor context
*/
public void execute(MessageContext context) {
//Processes a message from the queue which aren't blank
Message incomingMessage = context.getMessage();
if (!incomingMessage.getMessageItems().isEmpty()) {
// Here we want to process messages specific to the different integration
// points. These message attributes are set in the user action
// "vSdkSendWarrantyUserAction" in the source Vault
String integrationPointApiName =
incomingMessage.getAttribute("integration_point",
MessageAttributeValueType.STRING);
switch(integrationPointApiName) {
case "receive_warranty__c":
// Here we want to process the messages associated with the inbound
// Integration Point "Receive Warranty". This is done in the method
// "intRulesETLWarrantiesMessage"
intRulesETLWarrantiesMessage(incomingMessage, context);
break;
// Add further case statements here if you wish to handle further
// integration points
}
}
}
Once the code has entered into the intRulesETLWarrantiesMessage
method, the message items are parsed through and then the source record IDs are used to find existing Claims Warranty records in the target vault.
The code uses the vSDKIntRulesHelper.getUnprocessedObjects()
method to get all unprocessed items from the Source Vault Integration Transaction object using a query against the records in that object. The query uses the Status field in the Integration Transaction records to get all pending_c
records.
- If records are found, add them to a
etlMessageMap Map<String,Record>
that is used to update the target based on the integration rules.
public static void intRulesETLWarrantiesMessage(Message message,
MessageContext context) {
LogService logService = ServiceLocator.locate(LogService.class);
List<String> incomingMessageList = VaultCollections.newList();
String sourceVaultId = context.getRemoteVaultId();
String connectionId = context.getConnectionId();
String integrationPointApiName =
message.getAttribute("integration_point", MessageAttributeValueType.STRING);
StringBuilder logMessage = new StringBuilder();
logMessage.append("Processing integrationPointApiName: ").append(integrationPointApiName);
logService.info(logMessage.toString());
String connectionName = "vsdk_connection_to_warranties";
String sourceObject = "vsdk_warranty__c";
// Retrieve all items in the Message and put them in a Map.
// The map is used to determine if the code wasn't able to find existing records.
for (String sourceRecordId : message.getMessageItems()) {
incomingMessageList.add(sourceRecordId);
logMessage = new StringBuilder();
logMessage.append("Incoming message item: ").append(sourceRecordId);
logService.info(logMessage.toString());
}
// Check for items which haven't been migrated
// This uses a callback to the source vault for the object
// then add them to the incoming message list for processing
List<String> unprocessedList = vSDKIntRulesHelper.getUnprocessedObjects(
connectionName,
sourceObject,
integrationPointApiName);
incomingMessageList.addAll(unprocessedList);
vSDKIntRulesETLWarranties.process(
incomingMessageList,
sourceVaultId,
sourceObject,
connectionId,
connectionName,
integrationPointApiName);
}
...
}
Now that the etlMessageMap
has been created and populated, we can initiate the process of extracting data from the source vault, and using the integration rules to transforming into into a Claims Warranty target vault record, which will later be saved to the target vault. The map contains the source record IDs as keys with the target record as the values.
public static void process(List<String> incomingMessageList,
String sourceVaultId,
String sourceObject,
String connectionId,
String connectionName,
String integrationPointApiName) {
...
if (etlMessageMap.size() > 0) {
v2vIntRulesETLWarrantiesFromMessageMap(etlMessageMap,
connectionId,
connectionName,
sourceObject,
destinationObject,
integrationApiName,
integrationPointApiName);
}
...
}
First we need to instantiate the Integration Rules Service. We then use the instantiated Integration Rules Service to get all Integration Rules for the Connection. See the newGetIntegrationRulesRequestBuilder Java Doc for additional options.
private static void v2vIntRulesETLWarrantiesFromMessageMap(Map<String, Record> recordsToETL,
String connectionId,
String connectionName,
String sourceObject,
String destinationObject,
String integrationApiName,
String integrationPointApiName) {
...
// Instantiate the Integration Rule Service
IntegrationRuleService integrationRuleService
= ServiceLocator.locate(IntegrationRuleService.class);
// Get integration rules for this connection
GetIntegrationRulesRequest getIntegrationRulesRequest
= integrationRuleService.newGetIntegrationRulesRequestBuilder()
.withConnectionId(connectionId).build();
GetIntegrationRulesResponse getIntegrationRulesResponse
= integrationRuleService.getIntegrationRules(getIntegrationRulesRequest);
...
}
Next, we need to iterate through each Integration Rule and get some details from the Warranty details on the Source Vault. We will loop through these until the desired IntegrationRule
object warranty_int_rule_1__c
is found. This is the integration rule that was created via MDL during setup.
Using the integration rules a VQL query will be dynamically created to return the source data. First the source object is retrieved from the integration rule, using the getFieldRules().iterator().next().getQueryObject()
, which is used in the VQL FROM
clause. Since all the field rules in this example contain the same source object, only one object is retrieved.
Next a collection of source fields is then populated using the getQueryFields()
method, which are used to create a VQL SELECT
clause.
In this example source data from another related relationship object warranty_period__cr
is also mapped. As there isn't a way to configure this via integration rules, this is manually added the the VQL SELECT
clause.
Finally a VQL WHERE
clause is added to bring back just the required source records.
The end VQL query is in the form of:
SELECT name__v, customer_name__c, cover_start_date__c, extended_cover__c, manufacturer__c, (SELECT name__v, period_in_months__c FROM warranty_period__cr) FROM vsdk_warranty__c where id contains CONTAINS ('source_ids')
private static void v2vIntRulesETLWarrantiesFromMessageMap(Map<String, Record> recordsToETL,
String connectionId,
String connectionName,
String sourceObject,
String destinationObject,
String integrationApiName,
String integrationPointApiName) {
...
for (Iterator<IntegrationRule> intRulesiterator = intRules.iterator();
intRulesiterator.hasNext();) {
IntegrationRule intRuleResult = intRulesiterator.next();
String intRulePointName = intRuleResult.getIntegrationPoint();
// Process the rule if the rule exists with a status of active.
if (intRulePointName.equals(integrationPointApiName)) {
intRuleExists = true;
String queryObject
= intRuleResult.getFieldRules().iterator().next().getQueryObject();
Collection<String> queryFields = intRuleResult.getQueryFields(queryObject);
// Create a query from the query object/fields for the configured
// Integration Rule, i.e. "warranty_int_rule_1__c"
StringBuilder query = new StringBuilder();
query.append("SELECT ");
query.append(String.join(",", queryFields));
query.append(", (SELECT name__v, period_in_months__c"); // Manually add an custom fields not
// included in the integration rules
query.append(" FROM warranty_period__cr)");
query.append(" FROM ").append(queryObject);
query.append(" WHERE id CONTAINS ('");
query.append(String.join("','", recordsToETL.keySet())).append("')");
...
The value retrieved from the query will be used later to calculate values before storing them in a Claims Warranty record on the Target Vault.
Now a VQL query has been built from the integration rules, an HttpRequest
is now setup and ready to be sent to the source vault. With the /api/{version}/query
endpoint, we want to receive JSON data back so you send the request with the HttpResponseBodyValueType.JSONDATA
parameter value.
When the HttpRequest
response from the source vault is received, it can be parsed through using the JsonService
class.
public static void v2vIntRulesExtractAndTransformWarranties(Map<String,Record> recordsToExtractAndTransform, String connectionId, String destinationObject) {
...
//The configured connection provides the full DNS name.
//For the path, you only need to append the API endpoint after the DNS.
//The query endpoint takes a POST where the BODY is the query itself.
request.setMethod(HttpMethod.POST);
request.appendPath("/api/v20.1/query");
request.setHeader("Content-Type", "application/x-www-form-urlencoded");
request.setHeader("X-VaultAPI-DescribeQuery", "true"); // *** New
request.setBodyParam("q", query.toString());
...
}
The response values are used to update the fields in the Claims Warranty record in the target vault via using the integration rules.
- First, the response is parsed into a
JsonData
object - From the response, the
getJsonObject()
will get the response as a parseableJsonObject
- Here the
getValue
method can be used to retrieveresponseStatus
,responseDetails
, anddata
- Here the
- The
data
element is an array of JSON data. This is parsed into aJsonArray
object.- Each queried record is returned as an element of the array and must be parsed into a
JsonObject
. -
evaluateFieldRules()
then creates a collection of evaluatedFieldRuleResults
from the queried records. - Individual target fields can then be set from the evaluated
FieldRuleResults
.
- Each queried record is returned as an element of the array and must be parsed into a
- Target fields can also be manually populated without the use of integration rules
- The
warranty_period__cr
subquery is returned in the query data as aJsonObject
from which theperiod_in_months__c
can be retrieved. - The target records
end_date__c
field is set to the by adding theperiod_in_months__c
minus a day to thestart_date__c
- The target records
integration_status__c
field is set toprocessing_successful__c
.
- The
private static void v2vIntRulesETLWarrantiesFromMessageMap(Map<String, Record> recordsToETL,
String connectionId,
String connectionName,
String sourceObject,
String destinationObject,
String integrationApiName,
String integrationPointApiName) {
...
httpService.sendRequest(request, HttpResponseBodyValueType.JSONDATA)
.onSuccess(httpResponse -> {
JsonData response = httpResponse.getResponseBody();
if (response.isValidJson()) {
String responseStatus = response.getJsonObject().getValue("responseStatus", JsonValueType.STRING);
if (responseStatus.equals("SUCCESS")) {
logService.info("HTTP Query Request: SUCCESS");
JsonArray data = response.getJsonObject().getValue("data", JsonValueType.ARRAY);
JsonObject queryDescribe = response.getJsonObject().getValue("queryDescribe",
JsonValueType.OBJECT);
//Retrieve each record returned from the VQL query.
//Each element of the returned `data` JsonArray is a record with it's queried fields.
for (int i = 0; i < data.getSize();i++) {
Boolean fieldRuleErrorOccurred = false;
JsonObject queryRecord = data.getValue(i, JsonValueType.OBJECT);
String sourceId = queryRecord.getValue("id", JsonValueType.STRING);
// Get the resulting values for a row based on the evaluation of the rule
// against a json query response record
logService.info("Evaluate field rules request for rule: " + intRuleResult.getName());
EvaluateFieldRulesRequest evaluateFieldRulesRequest
= integrationRuleService.newEvaluateFieldRulesRequestBuilder()
.withIntegrationRule(intRuleResult)
.withTargetName(destinationObject)
.withQueryDescribe(queryDescribe)
.withQueryData(queryRecord)
.build();
EvaluateFieldRulesResponse evaluateFieldRulesResponse
= integrationRuleService.evaluateFieldRules(evaluateFieldRulesRequest);
Collection<FieldRuleResult> fieldRuleResults
= evaluateFieldRulesResponse.getFieldRuleResults();
// Set the records field values in the target vault using values in "fieldRuleResults"
for (Iterator<FieldRuleResult> fieldRuleResultIterator
= fieldRuleResults.iterator(); fieldRuleResultIterator.hasNext(); ) {
FieldRuleResult fieldRuleResult = fieldRuleResultIterator.next();
if (!fieldRuleResult.hasError()) {
recordsToETL.get(sourceId).setValue(
fieldRuleResult.getTargetField(), fieldRuleResult.getValue());
} else {
FieldRuleError error = fieldRuleResult.getError();
vSDKIntRulesHelper.createUserExceptionMessage(
integrationApiName,
integrationPointApiName,
error.getMessage(),
"vSDKIntRulesETLWarranties",
sourceId);
// Do handling based on error type and message
ItemErrorType errorType = error.getType();
String errorMessage = error.getMessage();
logService.info("Record field error type: " + errorType + "," +
" message: " + errorMessage);
// Remove the failed record so it's not processed until the field
// error is resolved
recordsToETL.remove(sourceId);
// Log the failed to update records
failedIdList.add(sourceId);
fieldRuleErrorOccurred = true;
break;
}
}
if (!fieldRuleErrorOccurred) {
//
// Set any additional custom fields
//
// If desired query results can be used to calculate custom fields.
// For instance the period_in_months__c field within the warranty_period__cr subquery
// can be retrieved via a JsonObject then JsonArray.
// This is basically an embedded JSON response.
JsonObject subquery = queryRecord.getValue("warranty_period__cr",
JsonValueType.OBJECT);
JsonArray subqueryData = subquery.getValue("data", JsonValueType.ARRAY);
JsonObject subqueryRecord = subqueryData.getValue(0, JsonValueType.OBJECT);
Integer warrantyPeriodMonths = Integer.parseInt(subqueryRecord.getValue(
"period_in_months__c", JsonValueType.STRING));
// Now calculate the end_date__c by adding the warranty period minus a
// day to the start_date__c
LocalDate startDate = recordsToETL.get(sourceId).getValue(
"start_date__c", ValueType.DATE);
recordsToETL.get(sourceId).setValue("end_date__c",
startDate.plusMonths(warrantyPeriodMonths).minusDays(1));
// List values can also be populated, such as the integration status in
// this example.
recordsToETL.get(sourceId).setValue("integration_status__c",
VaultCollections.asList("processing_successful__c"));
// Log the successfully updated records
successfulIdList.add(sourceId);
}
}
data = null;
}
else {
for (String key : recordsToETL.keySet()) {
failedIdList.add(key);
}
}
response = null;
}
})
.onError(httpOperationError -> {
for (String key : recordsToETL.keySet()) {
failedIdList.add(key);
}
}).execute();
...
}
We then need to save the transformed records. In this case, to deal with potential volume we use the batchSaveRecord
method from the RecordService
.
First, we add all of the recordsToETL
values to the newly created transformedRecordlist
.
Next we define and run the batchSaveRecords using the transformedRecordList
values and handle any errors. In this case we are using the UserExceptions
Spark component to surface the errors to an administrative (non-technical) users. The UserExceptions
are implemented as Object records, so the RecordService
is used to create the UserException
records.
List<Record> transformedRecordList = VaultCollections.newList();
if (recordsToETL.size() > 0) {
transformedRecordList.addAll(recordsToETL.values());
recordsToETL.clear();
recordService.batchSaveRecords(transformedRecordList).onSuccesses(successMessage -> {
List<Record> successfulRecords = VaultCollections.newList();
successMessage.stream().forEach(positionalRecordId -> {
Record record = recordService.newRecordWithId(destinationObject,
positionalRecordId.getRecordId());
successfulRecords.add(record);
});
}).onErrors(batchOperationErrors -> {
batchOperationErrors.stream().forEach(error -> {
String errMsg = error.getError().getMessage();
int errPosition = error.getInputPosition();
String sourceId = transformedRecordList.get(errPosition).getValue("source_record__c",
ValueType.STRING);
// Create a User Exception per record error, as each one may be different
//
// An example of when this error might occur would be if a mandatory field in the
// target object wasn't populated.
StringBuilder logMessage = new StringBuilder();
logMessage.append("Unable to create '")
.append(transformedRecordList.get(errPosition).getObjectName())
.append("' record, because of '").append(errMsg).append("'.");
vSDKIntRulesHelper.createUserExceptionMessage(
integrationApiName,
integrationPointApiName,
logMessage.toString(),
"vSDKIntRulesETLWarranties",
sourceId);
// Move the failed record from the successful to failed list
successfulIdList.remove(sourceId);
failedIdList.add(sourceId);
});
}).execute();
Lastly, we need to set the processed objects to processed_c
status on the Source Vault
private static void v2vIntRulesETLWarrantiesFromMessageMap(Map<String, Record> recordsToETL,
String connectionId,
String connectionName,
String sourceObject,
String destinationObject,
String integrationApiName,
String integrationPointApiName) {
...
// Update the source system to say whether records have updated successfully or not,
// by making a callback to the source vault and updating the integration transaction
// record associated with the source record.
vSDKIntRulesHelper.setIntTransProcessedStatuses(
successfulIdList,
connectionName, // i.e. vsdk_connection_to_warranties
queryObject, // i.e. vsdk_warranty__c
integrationPointApiName,
true);
vSDKIntRulesHelper.setIntTransProcessedStatuses(
failedIdList,
connectionName, // i.e. vsdk_connection_to_warranties
queryObject, // i.e. vsdk_warranty__c
integrationPointApiName,
false);
...
}