diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/pom.xml b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/pom.xml
index d085c54a97d5..05684a4e9408 100644
--- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/pom.xml
+++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/pom.xml
@@ -38,7 +38,11 @@
com.fasterxml.jackson.core
jackson-databind
- compile
+ provided
+
+
+ org.apache.nifi
+ nifi-oauth2-provider-api
diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/AuthorizationScheme.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/AuthorizationScheme.java
index c62e1f22d334..feb31b44c69b 100644
--- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/AuthorizationScheme.java
+++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/AuthorizationScheme.java
@@ -22,7 +22,8 @@ public enum AuthorizationScheme implements DescribedValue {
NONE("None", "No authorization scheme."),
PKI("PKI", "Mutual TLS with PKI certificate authorization scheme."),
BASIC("Basic", "Basic authorization scheme."),
- API_KEY("API Key", "API key authorization scheme.");
+ API_KEY("API Key", "API key authorization scheme."),
+ JWT("JWT", "JWT realm scheme.");
private final String displayName;
private final String description;
diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java
index 77e819332a48..b1c64cc12d2a 100644
--- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java
+++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java
@@ -22,6 +22,7 @@
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.VerifiableControllerService;
import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.proxy.ProxyConfiguration;
import org.apache.nifi.proxy.ProxySpec;
@@ -50,6 +51,7 @@ public interface ElasticSearchClientService extends ControllerService, Verifiabl
.identifiesControllerService(SSLContextProvider.class)
.addValidator(Validator.VALID)
.build();
+
PropertyDescriptor PROXY_CONFIGURATION_SERVICE = ProxyConfiguration.createProxyConfigPropertyDescriptor(ProxySpec.HTTP);
PropertyDescriptor AUTHORIZATION_SCHEME = new PropertyDescriptor.Builder()
@@ -62,12 +64,41 @@ public interface ElasticSearchClientService extends ControllerService, Verifiabl
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
+ PropertyDescriptor OAUTH2_ACCESS_TOKEN_PROVIDER = new PropertyDescriptor.Builder()
+ .name("el-cs-oauth2-token-provider")
+ .displayName("OAuth2 Access Token Provider")
+ .description("The OAuth2 Access Token Provider used to provide JWTs for Bearer Token Authorization with Elasticsearch.")
+ .dependsOn(AUTHORIZATION_SCHEME, AuthorizationScheme.JWT)
+ .required(true)
+ .identifiesControllerService(OAuth2AccessTokenProvider.class)
+ .addValidator(Validator.VALID)
+ .build();
+
+ PropertyDescriptor JWT_SHARED_SECRET = new PropertyDescriptor.Builder()
+ .name("jwt-shared-secret")
+ .displayName("JWT Shared Secret")
+ .description("JWT realm Shared Secret.")
+ .dependsOn(AUTHORIZATION_SCHEME, AuthorizationScheme.JWT)
+ .required(true)
+ .sensitive(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ PropertyDescriptor RUN_AS_USER = new PropertyDescriptor.Builder()
+ .name("el-cs-run-as-user")
+ .displayName("Run As User")
+ .description("The username to impersonate within Elasticsearch.")
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
.name("el-cs-username")
.displayName("Username")
.description("The username to use with XPack security.")
.dependsOn(AUTHORIZATION_SCHEME, AuthorizationScheme.BASIC)
- .required(false)
+ .required(true)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
@@ -77,7 +108,7 @@ public interface ElasticSearchClientService extends ControllerService, Verifiabl
.displayName("Password")
.description("The password to use with XPack security.")
.dependsOn(AUTHORIZATION_SCHEME, AuthorizationScheme.BASIC)
- .required(false)
+ .required(true)
.sensitive(true)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
@@ -88,7 +119,7 @@ public interface ElasticSearchClientService extends ControllerService, Verifiabl
.displayName("API Key ID")
.description("Unique identifier of the API key.")
.dependsOn(AUTHORIZATION_SCHEME, AuthorizationScheme.API_KEY)
- .required(false)
+ .required(true)
.sensitive(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
@@ -98,7 +129,7 @@ public interface ElasticSearchClientService extends ControllerService, Verifiabl
.displayName("API Key")
.description("Encoded API key.")
.dependsOn(AUTHORIZATION_SCHEME, AuthorizationScheme.API_KEY)
- .required(false)
+ .required(true)
.sensitive(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
@@ -220,7 +251,7 @@ public interface ElasticSearchClientService extends ControllerService, Verifiabl
.name("el-cs-sniff-failure")
.displayName("Sniff on Failure")
.description("Enable sniffing on failure, meaning that after each failure the Elasticsearch nodes list gets updated " +
- "straightaway rather than at the following ordinary sniffing round")
+ "straight away rather than at the following ordinary sniffing round")
.dependsOn(SNIFF_CLUSTER_NODES, "true")
.allowableValues("true", "false")
.defaultValue("false")
@@ -370,7 +401,7 @@ public interface ElasticSearchClientService extends ControllerService, Verifiabl
/**
* Perform a search using the JSON DSL.
*
- * @param query A JSON string reprensenting the query.
+ * @param query A JSON string representing the query.
* @param index The index to target. Optional.
* @param type The type to target. Optional. Will not be used in future versions of Elasticsearch.
* @param requestParameters A collection of URL request parameters. Optional.
diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml
index dc2ab871dfea..3eb6532e47cf 100644
--- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml
+++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml
@@ -47,6 +47,10 @@
org.apache.nifi
nifi-proxy-configuration-api
+
+ org.apache.nifi
+ nifi-oauth2-provider-api
+
org.apache.nifi
nifi-elasticsearch-client-service-api
@@ -83,23 +87,11 @@
jackson-annotations
provided
-
- commons-io
- commons-io
-
-
- org.apache.commons
- commons-compress
-
com.github.stephenc.findbugs
findbugs-annotations
1.3.9-1
-
- org.apache.commons
- commons-lang3
-
org.opentest4j
opentest4j
diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
index b3e6f7a1f46d..9bb341473303 100644
--- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
+++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
@@ -45,6 +45,7 @@
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.proxy.ProxyConfiguration;
@@ -56,6 +57,7 @@
import org.elasticsearch.client.Node;
import org.elasticsearch.client.NodeSelector;
import org.elasticsearch.client.Request;
+import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
@@ -110,6 +112,9 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
PASSWORD,
API_KEY_ID,
API_KEY,
+ JWT_SHARED_SECRET,
+ OAUTH2_ACCESS_TOKEN_PROVIDER,
+ RUN_AS_USER,
PROP_SSL_CONTEXT_SERVICE,
PROXY_CONFIGURATION_SERVICE,
CONNECT_TIMEOUT,
@@ -127,6 +132,8 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
SNIFFER_FAILURE_DELAY
);
+ private OAuth2AccessTokenProvider oAuth2AccessTokenProvider;
+
private RestClient client;
private Sniffer sniffer;
@@ -157,12 +164,6 @@ protected Collection customValidate(final ValidationContext va
final AuthorizationScheme authorizationScheme = validationContext.getProperty(AUTHORIZATION_SCHEME).asAllowableValue(AuthorizationScheme.class);
- final boolean usernameSet = validationContext.getProperty(USERNAME).isSet();
- final boolean passwordSet = validationContext.getProperty(PASSWORD).isSet();
-
- final boolean apiKeyIdSet = validationContext.getProperty(API_KEY_ID).isSet();
- final boolean apiKeySet = validationContext.getProperty(API_KEY).isSet();
-
final SSLContextProvider sslContextProvider = validationContext.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextProvider.class);
if (authorizationScheme == AuthorizationScheme.PKI && (sslContextProvider == null)) {
@@ -173,18 +174,6 @@ protected Collection customValidate(final ValidationContext va
);
}
- if (usernameSet && !passwordSet) {
- addAuthorizationPropertiesValidationIssue(results, USERNAME, PASSWORD);
- } else if (passwordSet && !usernameSet) {
- addAuthorizationPropertiesValidationIssue(results, PASSWORD, USERNAME);
- }
-
- if (apiKeyIdSet && !apiKeySet) {
- addAuthorizationPropertiesValidationIssue(results, API_KEY_ID, API_KEY);
- } else if (apiKeySet && !apiKeyIdSet) {
- addAuthorizationPropertiesValidationIssue(results, API_KEY, API_KEY_ID);
- }
-
final boolean sniffClusterNodes = validationContext.getProperty(SNIFF_CLUSTER_NODES).asBoolean();
final boolean sniffOnFailure = validationContext.getProperty(SNIFF_ON_FAILURE).asBoolean();
if (sniffOnFailure && !sniffClusterNodes) {
@@ -195,23 +184,17 @@ protected Collection customValidate(final ValidationContext va
return results;
}
- private void addAuthorizationPropertiesValidationIssue(final List results, final PropertyDescriptor presentProperty, final PropertyDescriptor missingProperty) {
- results.add(new ValidationResult.Builder().subject(missingProperty.getName()).valid(false)
- .explanation(String.format("if '%s' is then '%s' must be set.", presentProperty.getDisplayName(), missingProperty.getDisplayName()))
- .build()
- );
- }
-
@OnEnabled
public void onEnabled(final ConfigurationContext context) throws InitializationException {
try {
this.client = setupClient(context);
this.sniffer = setupSniffer(context, this.client);
- responseCharset = Charset.forName(context.getProperty(CHARSET).getValue());
+ this.responseCharset = Charset.forName(context.getProperty(CHARSET).getValue());
+
+ this.oAuth2AccessTokenProvider = context.getProperty(OAUTH2_ACCESS_TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class);
// re-create the ObjectMapper in case the SUPPRESS_NULLS property has changed - the JsonInclude settings aren't dynamic
createObjectMapper(context);
-
} catch (final Exception ex) {
getLogger().error("Could not initialize ElasticSearch client.", ex);
throw new InitializationException(ex);
@@ -264,10 +247,11 @@ public List verify(final ConfigurationContext context,
clientSetupResult.outcome(ConfigVerificationResult.Outcome.SUCCESSFUL);
// try to fetch the Elasticsearch root endpoint (system summary)
- verifyRootConnection(verifyClient, connectionResult, warningsResult);
+ final OAuth2AccessTokenProvider tokenProvider = context.getProperty(OAUTH2_ACCESS_TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class);
+ verifyRootConnection(verifyClient, tokenProvider, connectionResult, warningsResult);
// try sniffing for cluster nodes
- verifySniffer(context, verifyClient, snifferResult);
+ verifySniffer(context, verifyClient, tokenProvider, snifferResult);
} catch (final MalformedURLException mue) {
clientSetupResult.outcome(ConfigVerificationResult.Outcome.FAILED)
.explanation("Incorrect/invalid " + ElasticSearchClientService.HTTP_HOSTS.getDisplayName());
@@ -303,7 +287,7 @@ public List verify(final ConfigurationContext context,
return results;
}
- private void verifySniffer(final ConfigurationContext context, final RestClient verifyClient, final ConfigVerificationResult.Builder snifferResult) {
+ private void verifySniffer(final ConfigurationContext context, final RestClient verifyClient, final OAuth2AccessTokenProvider tokenProvider, final ConfigVerificationResult.Builder snifferResult) {
try (final Sniffer verifySniffer = setupSniffer(context, verifyClient)) {
if (verifySniffer != null) {
final List originalNodes = verifyClient.getNodes();
@@ -317,7 +301,7 @@ private void verifySniffer(final ConfigurationContext context, final RestClient
nodes.forEach(n -> {
try {
verifyClient.setNodes(Collections.singletonList(n));
- final List warnings = getElasticsearchRoot(verifyClient);
+ final List warnings = getElasticsearchRoot(verifyClient, tokenProvider);
successfulInstances.getAndIncrement();
if (!warnings.isEmpty()) {
warningInstances.getAndIncrement();
@@ -351,17 +335,20 @@ private void verifySniffer(final ConfigurationContext context, final RestClient
}
}
- private List getElasticsearchRoot(final RestClient verifyClient) throws IOException {
- final Response response = verifyClient.performRequest(new Request("GET", "/"));
+ private List getElasticsearchRoot(final RestClient verifyClient, final OAuth2AccessTokenProvider tokenProvider) throws IOException {
+ final Request request = addJWTAuthorizationHeader(new Request("GET", "/"), tokenProvider);
+ final Response response = verifyClient.performRequest(request);
final List warnings = parseResponseWarningHeaders(response);
+ // ensure the response can be parsed without exception
parseResponse(response);
return warnings;
}
- private void verifyRootConnection(final RestClient verifyClient, final ConfigVerificationResult.Builder connectionResult, final ConfigVerificationResult.Builder warningsResult) {
+ private void verifyRootConnection(final RestClient verifyClient, final OAuth2AccessTokenProvider tokenProvider,
+ final ConfigVerificationResult.Builder connectionResult, final ConfigVerificationResult.Builder warningsResult) {
try {
- final List warnings = getElasticsearchRoot(verifyClient);
+ final List warnings = getElasticsearchRoot(verifyClient, tokenProvider);
connectionResult.outcome(ConfigVerificationResult.Outcome.SUCCESSFUL);
if (warnings.isEmpty()) {
@@ -439,9 +426,13 @@ private RestClientBuilder addAuthAndProxy(final ConfigurationContext context, fi
final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
final String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
+ final String runAsUser = context.getProperty(RUN_AS_USER).evaluateAttributeExpressions().getValue();
+
final String apiKeyId = context.getProperty(API_KEY_ID).getValue();
final String apiKey = context.getProperty(API_KEY).getValue();
+ final String jwtSharedSecret = context.getProperty(JWT_SHARED_SECRET).getValue();
+
final SSLContext sslContext = getSSLContext(context);
final ProxyConfigurationService proxyConfigurationService = context.getProperty(PROXY_CONFIGURATION_SERVICE).asControllerService(ProxyConfigurationService.class);
@@ -459,6 +450,12 @@ private RestClientBuilder addAuthAndProxy(final ConfigurationContext context, fi
if (AuthorizationScheme.API_KEY == authorizationScheme && apiKeyId != null && apiKey != null) {
defaultHeaders.add(createApiKeyAuthorizationHeader(apiKeyId, apiKey));
}
+ if (AuthorizationScheme.JWT == authorizationScheme && jwtSharedSecret != null) {
+ defaultHeaders.add(createSharedSecretHeader(jwtSharedSecret));
+ }
+ if (runAsUser != null) {
+ defaultHeaders.add(createRunAsUserHeader(runAsUser));
+ }
if (!defaultHeaders.isEmpty()) {
builder.setDefaultHeaders(defaultHeaders.toArray(new Header[0]));
}
@@ -524,6 +521,23 @@ private BasicHeader createApiKeyAuthorizationHeader(final String apiKeyId, final
return new BasicHeader("Authorization", "ApiKey " + apiKeyAuth);
}
+ private BasicHeader createSharedSecretHeader(final String jwtSharedSecret) {
+ return new BasicHeader("ES-Client-Authentication", "sharedsecret " + jwtSharedSecret);
+ }
+
+ private BasicHeader createRunAsUserHeader(final String runAsUser) {
+ return new BasicHeader("es-security-runas-user", runAsUser);
+ }
+
+ private Request addJWTAuthorizationHeader(final Request request, final OAuth2AccessTokenProvider tokenProvider) {
+ if (tokenProvider != null) {
+ final RequestOptions.Builder requestOptionsBuilder = RequestOptions.DEFAULT.toBuilder();
+ requestOptionsBuilder.addHeader("Authorization", "Bearer " + tokenProvider.getAccessDetails().getAccessToken());
+ request.setOptions(requestOptionsBuilder.build());
+ }
+ return request;
+ }
+
private Sniffer setupSniffer(final ConfigurationContext context, final RestClient restClient) {
final boolean sniffClusterNodes = context.getProperty(SNIFF_CLUSTER_NODES).asBoolean();
final int snifferIntervalMillis = context.getProperty(SNIFFER_INTERVAL).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
@@ -1016,7 +1030,7 @@ public String getTransitUrl(final String index, final String type) {
}
private Response performRequest(final String method, final String endpoint, final Map parameters, final HttpEntity entity) throws IOException {
- final Request request = new Request(method, endpoint);
+ final Request request = addJWTAuthorizationHeader(new Request(method, endpoint), oAuth2AccessTokenProvider);
if (parameters != null && !parameters.isEmpty()) {
request.addParameters(parameters);
}
diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java
index 38b39975628b..b9f0d7dd23c0 100644
--- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java
+++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java
@@ -209,12 +209,12 @@ private Record getById(final String _id, final Map context) thro
return null;
}
- final Map source = (Map) response.getHits().get(0).get("_source");
+ final Map source = (Map) response.getHits().getFirst().get("_source");
final RecordSchema toUse = getSchema(context, source, null);
Record record = new MapRecord(toUse, source);
- if (recordPathMappings.size() > 0) {
+ if (!recordPathMappings.isEmpty()) {
record = applyMappings(record, source);
}
@@ -239,7 +239,7 @@ private Map buildQuery(final Map coordinates) {
put(e.getKey(), e.getValue());
}});
}
- }}).collect(Collectors.toList())
+ }}).toList()
);
}});
}};
@@ -256,10 +256,10 @@ private Record getByQuery(final Map query, final Map source = (Map) response.getHits().get(0).get("_source");
+ final Map source = (Map) response.getHits().getFirst().get("_source");
final RecordSchema toUse = getSchema(context, source, null);
Record record = new MapRecord(toUse, source);
- if (recordPathMappings.size() > 0) {
+ if (!recordPathMappings.isEmpty()) {
record = applyMappings(record, source);
}
diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java
index 2e7922084719..fe6c637acc5d 100644
--- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java
+++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.java
@@ -263,7 +263,7 @@ private void assertBasicSearch(final Map requestParameters) thro
"four", 4, "five", 5)
.build();
- buckets.forEach( (aggRes) -> {
+ buckets.forEach(aggRes -> {
final String key = (String) aggRes.get("key");
final Integer docCount = (Integer) aggRes.get("doc_count");
assertEquals(expected.get(key), docCount, String.format("%s did not match.", key));
diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchLookupService_IT.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchLookupService_IT.java
index 8129e999d5bc..3fc0b5076f89 100644
--- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchLookupService_IT.java
+++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/integration/ElasticSearchLookupService_IT.java
@@ -49,6 +49,7 @@ class ElasticSearchLookupService_IT extends AbstractElasticsearch_IT {
private ElasticSearchLookupService lookupService;
+ @Override
@BeforeEach
void before() throws Exception {
super.before();
diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/unit/ElasticSearchClientServiceImplTest.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/unit/ElasticSearchClientServiceImplTest.java
index 9fdbcee8cd74..98fc5cd674d5 100644
--- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/unit/ElasticSearchClientServiceImplTest.java
+++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/unit/ElasticSearchClientServiceImplTest.java
@@ -21,6 +21,7 @@
import org.apache.nifi.elasticsearch.ElasticSearchClientService;
import org.apache.nifi.elasticsearch.ElasticSearchClientServiceImpl;
import org.apache.nifi.elasticsearch.TestControllerServiceProcessor;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.ssl.SSLContextProvider;
import org.apache.nifi.util.TestRunner;
@@ -72,14 +73,11 @@ void testValidateBasicAuth() {
runner.assertValid(service);
runner.removeProperty(service, ElasticSearchClientService.PASSWORD);
- assertAuthorizationPropertyValidationErrorMessage(ElasticSearchClientService.USERNAME, ElasticSearchClientService.PASSWORD);
-
- runner.removeProperty(service, ElasticSearchClientService.USERNAME);
- runner.assertValid(service);
+ assertAuthorizationPropertyValidationErrorMessage(ElasticSearchClientService.PASSWORD);
runner.setProperty(service, ElasticSearchClientService.PASSWORD, "password");
runner.removeProperty(service, ElasticSearchClientService.USERNAME);
- assertAuthorizationPropertyValidationErrorMessage(ElasticSearchClientService.PASSWORD, ElasticSearchClientService.USERNAME);
+ assertAuthorizationPropertyValidationErrorMessage(ElasticSearchClientService.USERNAME);
}
@Test
@@ -90,14 +88,11 @@ void testValidateApiKeyAuth() {
runner.assertValid(service);
runner.removeProperty(service, ElasticSearchClientService.API_KEY_ID);
- assertAuthorizationPropertyValidationErrorMessage(ElasticSearchClientService.API_KEY, ElasticSearchClientService.API_KEY_ID);
-
- runner.removeProperty(service, ElasticSearchClientService.API_KEY);
- runner.assertValid(service);
+ assertAuthorizationPropertyValidationErrorMessage(ElasticSearchClientService.API_KEY_ID);
runner.setProperty(service, ElasticSearchClientService.API_KEY_ID, "api-key-id");
runner.removeProperty(service, ElasticSearchClientService.API_KEY);
- assertAuthorizationPropertyValidationErrorMessage(ElasticSearchClientService.API_KEY_ID, ElasticSearchClientService.API_KEY);
+ assertAuthorizationPropertyValidationErrorMessage(ElasticSearchClientService.API_KEY);
}
@Test
@@ -114,9 +109,26 @@ void testValidatePkiAuth() throws InitializationException {
assertPKIAuthorizationValidationErrorMessage();
}
- private void assertAuthorizationPropertyValidationErrorMessage(final PropertyDescriptor presentProperty, final PropertyDescriptor missingProperty) {
+ @Test
+ void testValidateJwtAuth() throws InitializationException {
+ runner.setProperty(service, ElasticSearchClientService.AUTHORIZATION_SCHEME, AuthorizationScheme.JWT);
+ runner.setProperty(service, ElasticSearchClientService.JWT_SHARED_SECRET, "jwt-shared-secret");
+ assertAuthorizationPropertyValidationErrorMessage(ElasticSearchClientService.OAUTH2_ACCESS_TOKEN_PROVIDER);
+
+ final OAuth2AccessTokenProvider oAuth2AccessTokenProvider = mock(OAuth2AccessTokenProvider.class);
+ when(oAuth2AccessTokenProvider.getIdentifier()).thenReturn("oauth2-access-token-provider");
+ runner.addControllerService("oauth2-access-token-provider", oAuth2AccessTokenProvider);
+ runner.setProperty(service, ElasticSearchClientService.OAUTH2_ACCESS_TOKEN_PROVIDER, "oauth2-access-token-provider");
+ runner.assertValid(service);
+
+ runner.removeProperty(service, ElasticSearchClientService.JWT_SHARED_SECRET);
+ assertAuthorizationPropertyValidationErrorMessage(ElasticSearchClientService.JWT_SHARED_SECRET);
+ }
+
+ private void assertAuthorizationPropertyValidationErrorMessage(final PropertyDescriptor missingProperty) {
final AssertionFailedError afe = assertThrows(AssertionFailedError.class, () -> runner.assertValid(service));
- assertTrue(afe.getMessage().contains(String.format("if '%s' is then '%s' must be set.", presentProperty.getDisplayName(), missingProperty.getDisplayName())));
+ final String expectedMessage = String.format("%s is required", missingProperty.getDisplayName());
+ assertTrue(afe.getMessage().contains(expectedMessage), String.format("Validation error message \"%s\" does not contain \"%s\"", afe.getMessage(), expectedMessage));
}
private void assertPKIAuthorizationValidationErrorMessage() {
diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/unit/ElasticSearchStringLookupServiceTest.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/unit/ElasticSearchStringLookupServiceTest.java
index 16971faabfa6..7e19c90ca8ac 100644
--- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/unit/ElasticSearchStringLookupServiceTest.java
+++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/java/org/apache/nifi/elasticsearch/unit/ElasticSearchStringLookupServiceTest.java
@@ -35,7 +35,7 @@
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
-public class ElasticSearchStringLookupServiceTest {
+class ElasticSearchStringLookupServiceTest {
private ElasticSearchClientService mockClientService;
private ElasticSearchStringLookupService lookupService;
@@ -56,8 +56,9 @@ public void setup() throws Exception {
runner.enableControllerService(lookupService);
}
+ @SuppressWarnings("unchecked")
@Test
- public void simpleLookupTest() throws Exception {
+ void simpleLookupTest() throws Exception {
Map coordinates = new HashMap<>();
coordinates.put(ElasticSearchStringLookupService.ID, "12345");
diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/pom.xml b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/pom.xml
index ed7fda19e878..2a1b71a43af2 100644
--- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/pom.xml
+++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/pom.xml
@@ -72,8 +72,9 @@ language governing permissions and limitations under the License. -->
test
- commons-io
- commons-io
+ org.apache.nifi
+ nifi-oauth2-provider-api
+ test
com.fasterxml.jackson.core
diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/pom.xml b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/pom.xml
index 1635b30ac7ac..cf760f6a9e51 100644
--- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/pom.xml
+++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/pom.xml
@@ -21,16 +21,6 @@ language governing permissions and limitations under the License. -->
jar
-
- com.fasterxml.jackson.core
- jackson-databind
- compile
-
-
- com.fasterxml.jackson.core
- jackson-core
- compile
-
com.github.docker-java
docker-java-api
diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java
index ae3db3cc072b..163b8fad9961 100644
--- a/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java
+++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java
@@ -52,7 +52,7 @@
public abstract class AbstractElasticsearchITBase {
// default Elasticsearch version should (ideally) match that in the nifi-elasticsearch-bundle#pom.xml for the integration-tests profile
protected static final DockerImageName IMAGE = DockerImageName
- .parse(System.getProperty("elasticsearch.docker.image", "docker.elastic.co/elasticsearch/elasticsearch:8.15.1"));
+ .parse(System.getProperty("elasticsearch.docker.image", "docker.elastic.co/elasticsearch/elasticsearch:8.17.0"));
protected static final String ELASTIC_USER_PASSWORD = System.getProperty("elasticsearch.elastic_user.password", RandomStringUtils.randomAlphanumeric(10, 20));
private static final int PORT = 9200;
protected static final ElasticsearchContainer ELASTICSEARCH_CONTAINER = new ElasticsearchContainer(IMAGE)
diff --git a/nifi-extension-bundles/nifi-elasticsearch-bundle/pom.xml b/nifi-extension-bundles/nifi-elasticsearch-bundle/pom.xml
index 2f5a5524518e..dd7f1c41539a 100644
--- a/nifi-extension-bundles/nifi-elasticsearch-bundle/pom.xml
+++ b/nifi-extension-bundles/nifi-elasticsearch-bundle/pom.xml
@@ -114,7 +114,7 @@ language governing permissions and limitations under the License. -->
elasticsearch7
- 7.17.23
+ 7.17.26