From 28d69a6009598cc2289ee4e7e021ba7da2c33dd5 Mon Sep 17 00:00:00 2001 From: Chris Sampson Date: Wed, 26 Feb 2025 20:58:42 +0000 Subject: [PATCH] Re-read Input FlowFile Records individually only if there are errors to output for PutElasticsearchRecord --- .../elasticsearch/PutElasticsearchRecord.java | 52 ++++++------------- .../PutElasticsearchRecordTest.java | 1 + 2 files changed, 17 insertions(+), 36 deletions(-) diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java index 94a6e5b54ca5..507f0315117a 100644 --- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java +++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java @@ -521,32 +521,6 @@ private void removeResultRecordFlowFiles(final List results, final Pro results.clear(); } - private Map getErrorInputRecords(final BulkOperation bundle, final ProcessSession session, final FlowFile input, final Map> errors, - final IndexOperationParameters indexOperationParameters, final int batch) - throws IOException, SchemaNotFoundException, MalformedRecordException { - try (final InputStream inStream = session.read(input); - final RecordReader reader = readerFactory.createRecordReader(input, inStream, getLogger())) { - - // skip through the input FlowFile to the current batch of records - for (int r = 0; r < batch * indexOperationParameters.getBatchSize(); r++) { - reader.nextRecord(); - } - - final Map errorInputRecords = new HashMap<>(errors.size(), 1); - for (int i = 0; i < bundle.getOriginalRecords().size(); i++) { - final Record inputRecord = reader.nextRecord(); - if (errors.containsKey(i)) { - errorInputRecords.put(i, inputRecord); - } - } - - return errorInputRecords; - } catch (final IOException | SchemaNotFoundException | MalformedRecordException ex) { - getLogger().error("Unable to read input records for errors", ex); - throw ex; - } - } - private ResponseDetails indexDocuments(final BulkOperation bundle, final ProcessSession session, final FlowFile input, final IndexOperationParameters indexOperationParameters, final int batch) throws IOException, SchemaNotFoundException, MalformedRecordException { @@ -561,21 +535,23 @@ private ResponseDetails indexDocuments(final BulkOperation bundle, final Process final int numSuccessful = response.getItems() == null ? 0 : response.getItems().size() - numErrors; final Map outputs = new HashMap<>(); - final Map errorInputRecords; - if (numErrors > 0) { - errorInputRecords = getErrorInputRecords(bundle, session, input, errors, indexOperationParameters, batch); - } else { - errorInputRecords = Collections.emptyMap(); - } + try (final InputStream inStream = session.read(input); + final RecordReader inputReader = readerFactory.createRecordReader(input, inStream, getLogger())) { + + // if there are errors present, skip through the input FlowFile to the current batch of records + if (numErrors > 0) { + for (int r = 0; r < batch * indexOperationParameters.getBatchSize(); r++) { + inputReader.nextRecord(); + } + } - try { for (int o = 0; o < bundle.getOriginalRecords().size(); o++) { final String type; final Relationship relationship; final Map error; final Record outputRecord; final RecordSchema recordSchema; - if (errors.containsKey(o)) { + if (numErrors > 0 && errors.containsKey(o)) { relationship = REL_ERRORS; error = errors.get(o); if (groupBulkErrors) { @@ -587,7 +563,7 @@ private ResponseDetails indexDocuments(final BulkOperation bundle, final Process } else { type = OUTPUT_TYPE_ERROR; } - outputRecord = errorInputRecords.get(o); + outputRecord = inputReader.nextRecord(); recordSchema = outputRecord.getSchema(); } else { relationship = REL_SUCCESSFUL; @@ -595,6 +571,10 @@ private ResponseDetails indexDocuments(final BulkOperation bundle, final Process type = OUTPUT_TYPE_SUCCESS; outputRecord = bundle.getOriginalRecords().get(o); recordSchema = bundle.getSchema(); + // skip the associated Input Record for this successful Record + if (numErrors > 0) { + inputReader.nextRecord(); + } } final Output output = getOutputByType(outputs, type, session, relationship, input, recordSchema); output.write(outputRecord, error); @@ -603,7 +583,7 @@ private ResponseDetails indexDocuments(final BulkOperation bundle, final Process for (final Output output : outputs.values()) { output.transfer(session); } - } catch (final IOException | SchemaNotFoundException ex) { + } catch (final IOException | SchemaNotFoundException | MalformedRecordException ex) { getLogger().error("Unable to write error/successful records", ex); outputs.values().forEach(o -> { try { diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.java index b17d20010c00..4305c2b240ec 100644 --- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.java +++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.java @@ -288,6 +288,7 @@ void simpleTestWithMockReader() throws Exception { runner.addControllerService("mockReader", mockReader); runner.setProperty(PutElasticsearchRecord.RECORD_READER, "mockReader"); runner.enableControllerService(mockReader); + runner.setAllowRecursiveReads(true); basicTest(0, 0, 1); }