Skip to content

Commit

Permalink
Re-read Input FlowFile Records individually only if there are errors …
Browse files Browse the repository at this point in the history
…to output for PutElasticsearchRecord
  • Loading branch information
ChrisSamo632 committed Feb 26, 2025
1 parent 57bc984 commit b8d8342
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -521,32 +521,6 @@ private void removeResultRecordFlowFiles(final List<FlowFile> results, final Pro
results.clear();
}

private Map<Integer, Record> getErrorInputRecords(final BulkOperation bundle, final ProcessSession session, final FlowFile input, final Map<Integer, Map<String, Object>> errors,
final IndexOperationParameters indexOperationParameters, final int batch)
throws IOException, SchemaNotFoundException, MalformedRecordException {
try (final InputStream inStream = session.read(input);
final RecordReader reader = readerFactory.createRecordReader(input, inStream, getLogger())) {

// skip through the input FlowFile to the current batch of records
for (int r = 0; r < batch * indexOperationParameters.getBatchSize(); r++) {
reader.nextRecord();
}

final Map<Integer, Record> errorInputRecords = new HashMap<>(errors.size(), 1);
for (int i = 0; i < bundle.getOriginalRecords().size(); i++) {
final Record inputRecord = reader.nextRecord();
if (errors.containsKey(i)) {
errorInputRecords.put(i, inputRecord);
}
}

return errorInputRecords;
} catch (final IOException | SchemaNotFoundException | MalformedRecordException ex) {
getLogger().error("Unable to read input records for errors", ex);
throw ex;
}
}

private ResponseDetails indexDocuments(final BulkOperation bundle, final ProcessSession session, final FlowFile input,
final IndexOperationParameters indexOperationParameters, final int batch)
throws IOException, SchemaNotFoundException, MalformedRecordException {
Expand All @@ -561,21 +535,23 @@ private ResponseDetails indexDocuments(final BulkOperation bundle, final Process
final int numSuccessful = response.getItems() == null ? 0 : response.getItems().size() - numErrors;
final Map<String, Output> outputs = new HashMap<>();

final Map<Integer, Record> errorInputRecords;
if (numErrors > 0) {
errorInputRecords = getErrorInputRecords(bundle, session, input, errors, indexOperationParameters, batch);
} else {
errorInputRecords = Collections.emptyMap();
}
try (final InputStream inStream = session.read(input);
final RecordReader inputReader = readerFactory.createRecordReader(input, inStream, getLogger())) {

// if there are errors present, skip through the input FlowFile to the current batch of records
if (numErrors > 0) {
for (int r = 0; r < batch * indexOperationParameters.getBatchSize(); r++) {
inputReader.nextRecord();
}
}

try {
for (int o = 0; o < bundle.getOriginalRecords().size(); o++) {
final String type;
final Relationship relationship;
final Map<String, Object> error;
final Record outputRecord;
final RecordSchema recordSchema;
if (errors.containsKey(o)) {
if (numErrors > 0 && errors.containsKey(o)) {
relationship = REL_ERRORS;
error = errors.get(o);
if (groupBulkErrors) {
Expand All @@ -587,14 +563,18 @@ private ResponseDetails indexDocuments(final BulkOperation bundle, final Process
} else {
type = OUTPUT_TYPE_ERROR;
}
outputRecord = errorInputRecords.get(o);
outputRecord = inputReader.nextRecord();
recordSchema = outputRecord.getSchema();
} else {
relationship = REL_SUCCESSFUL;
error = null;
type = OUTPUT_TYPE_SUCCESS;
outputRecord = bundle.getOriginalRecords().get(o);
recordSchema = bundle.getSchema();
// skip the associated Input Record for this successful Record
if (numErrors > 0) {
inputReader.nextRecord();
}
}
final Output output = getOutputByType(outputs, type, session, relationship, input, recordSchema);
output.write(outputRecord, error);
Expand All @@ -603,7 +583,7 @@ private ResponseDetails indexDocuments(final BulkOperation bundle, final Process
for (final Output output : outputs.values()) {
output.transfer(session);
}
} catch (final IOException | SchemaNotFoundException ex) {
} catch (final IOException | SchemaNotFoundException | MalformedRecordException ex) {
getLogger().error("Unable to write error/successful records", ex);
outputs.values().forEach(o -> {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ void simpleTestWithMockReader() throws Exception {
runner.addControllerService("mockReader", mockReader);
runner.setProperty(PutElasticsearchRecord.RECORD_READER, "mockReader");
runner.enableControllerService(mockReader);
runner.setAllowRecursiveReads(true);

basicTest(0, 0, 1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
public abstract class AbstractElasticsearchITBase {
// default Elasticsearch version should (ideally) match that in the nifi-elasticsearch-bundle#pom.xml for the integration-tests profile
protected static final DockerImageName IMAGE = DockerImageName
.parse(System.getProperty("elasticsearch.docker.image", "docker.elastic.co/elasticsearch/elasticsearch:8.17.1"));
.parse(System.getProperty("elasticsearch.docker.image", "docker.elastic.co/elasticsearch/elasticsearch:8.17.2"));
protected static final String ELASTIC_USER_PASSWORD = System.getProperty("elasticsearch.elastic_user.password", RandomStringUtils.randomAlphanumeric(10, 20));
private static final int PORT = 9200;
protected static final ElasticsearchContainer ELASTICSEARCH_CONTAINER = new ElasticsearchContainer(IMAGE)
Expand Down
2 changes: 1 addition & 1 deletion nifi-extension-bundles/nifi-elasticsearch-bundle/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ language governing permissions and limitations under the License. -->
</modules>

<properties>
<elasticsearch.client.version>8.17.1</elasticsearch.client.version>
<elasticsearch.client.version>8.17.2</elasticsearch.client.version>
</properties>

<dependencyManagement>
Expand Down

0 comments on commit b8d8342

Please sign in to comment.