diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConfiguration.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConfiguration.java index 578b1b32..92f558a9 100644 --- a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConfiguration.java +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConfiguration.java @@ -46,6 +46,7 @@ import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.INDEX_OPTION; import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.KEY_DELIMITER_OPTION; import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.PASSWORD_OPTION; +import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.RETRIES_ON_CONFLICT_OPTION; import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.SOCKET_TIMEOUT; import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.USERNAME_OPTION; import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM; @@ -129,4 +130,8 @@ public List getHosts() { public Optional getParallelism() { return config.getOptional(SINK_PARALLELISM); } + + public int getRetriesOnConflict() { + return config.getOptional(RETRIES_ON_CONFLICT_OPTION).orElse(0); + } } diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConnectorOptions.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConnectorOptions.java index 10ea0ae2..d273c11a 100644 --- a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConnectorOptions.java +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConnectorOptions.java @@ -145,4 +145,12 @@ public class ElasticsearchConnectorOptions { .enumType(DeliveryGuarantee.class) .defaultValue(DeliveryGuarantee.AT_LEAST_ONCE) .withDescription("Optional delivery guarantee when committing."); + + public static final ConfigOption RETRIES_ON_CONFLICT_OPTION = + ConfigOptions.key("sink.retries-on-conflict") + .intType() + .defaultValue(0) + .withDescription( + "Sets the number of retries of a version conflict occurs " + + "because the document was updated between getting it and updating it. Defaults to 0."); } diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSink.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSink.java index 0fd389bd..7828d4f9 100644 --- a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSink.java +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSink.java @@ -127,7 +127,8 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { format, XContentType.JSON, documentType, - createKeyExtractor()); + createKeyExtractor(), + config.getRetriesOnConflict()); ElasticsearchSinkBuilderBase builder = builderSupplier.get(); diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkFactoryBase.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkFactoryBase.java index ed5e7f73..daf43a4e 100644 --- a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkFactoryBase.java +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkFactoryBase.java @@ -63,6 +63,7 @@ import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.INDEX_OPTION; import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.KEY_DELIMITER_OPTION; import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.PASSWORD_OPTION; +import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.RETRIES_ON_CONFLICT_OPTION; import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.SOCKET_TIMEOUT; import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.USERNAME_OPTION; import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM; @@ -225,7 +226,8 @@ public Set> optionalOptions() { DELIVERY_GUARANTEE_OPTION, PASSWORD_OPTION, USERNAME_OPTION, - SINK_PARALLELISM) + SINK_PARALLELISM, + RETRIES_ON_CONFLICT_OPTION) .collect(Collectors.toSet()); } diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/RowElasticsearchEmitter.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/RowElasticsearchEmitter.java index bddc6cb1..51c4c336 100644 --- a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/RowElasticsearchEmitter.java +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/RowElasticsearchEmitter.java @@ -50,18 +50,21 @@ class RowElasticsearchEmitter implements ElasticsearchEmitter { private final XContentType contentType; @Nullable private final String documentType; private final Function createKey; + private final int retiesOnConflict; public RowElasticsearchEmitter( IndexGenerator indexGenerator, SerializationSchema serializationSchema, XContentType contentType, @Nullable String documentType, - Function createKey) { + Function createKey, + int retriesOnConflict) { this.indexGenerator = checkNotNull(indexGenerator); this.serializationSchema = checkNotNull(serializationSchema); this.contentType = checkNotNull(contentType); this.documentType = documentType; this.createKey = checkNotNull(createKey); + this.retiesOnConflict = retriesOnConflict; } @Override @@ -109,7 +112,8 @@ private void processUpsert(RowData row, RequestIndexer indexer) { final UpdateRequest updateRequest = new UpdateRequest(indexGenerator.generate(row), documentType, key) .doc(document, contentType) - .upsert(document, contentType); + .upsert(document, contentType) + .retryOnConflict(retiesOnConflict); indexer.add(updateRequest); } else { final IndexRequest indexRequest = diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConfiguration.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConfiguration.java index 04c76333..5a698291 100644 --- a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConfiguration.java +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConfiguration.java @@ -38,6 +38,7 @@ import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.BULK_FLUSH_INTERVAL_OPTION; import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.FAILURE_HANDLER_OPTION; import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.PASSWORD_OPTION; +import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.RETRIES_ON_CONFLICT_OPTION; import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.USERNAME_OPTION; /** Accessor methods to elasticsearch options. */ @@ -110,6 +111,10 @@ public Optional getPassword() { return config.getOptional(PASSWORD_OPTION); } + public int getRetriesOnConflict() { + return config.getOptional(RETRIES_ON_CONFLICT_OPTION).get(); + } + public boolean isBulkFlushBackoffEnabled() { return config.get(BULK_FLUSH_BACKOFF_TYPE_OPTION) != ElasticsearchConnectorOptions.BackOffType.DISABLED; diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConnectorOptions.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConnectorOptions.java index 4838b035..ab1874a2 100644 --- a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConnectorOptions.java +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConnectorOptions.java @@ -152,6 +152,14 @@ public class ElasticsearchConnectorOptions { "The format must produce a valid JSON document. " + "Please refer to the documentation on formats for more details."); + public static final ConfigOption RETRIES_ON_CONFLICT_OPTION = + ConfigOptions.key("sink.retries-on-conflict") + .intType() + .defaultValue(0) + .withDescription( + "Sets the number of retries of a version conflict occurs " + + "because the document was updated between getting it and updating it. Defaults to 0."); + // -------------------------------------------------------------------------------------------- // Enums // -------------------------------------------------------------------------------------------- diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RowElasticsearchSinkFunction.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RowElasticsearchSinkFunction.java index 48762522..51451e02 100644 --- a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RowElasticsearchSinkFunction.java +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RowElasticsearchSinkFunction.java @@ -51,6 +51,7 @@ class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction private final XContentType contentType; private final RequestFactory requestFactory; private final Function createKey; + private final int retryOnConflictNum; public RowElasticsearchSinkFunction( IndexGenerator indexGenerator, @@ -58,13 +59,15 @@ public RowElasticsearchSinkFunction( SerializationSchema serializationSchema, XContentType contentType, RequestFactory requestFactory, - Function createKey) { + Function createKey, + int retryOnConflictNum) { this.indexGenerator = Preconditions.checkNotNull(indexGenerator); this.docType = docType; this.serializationSchema = Preconditions.checkNotNull(serializationSchema); this.contentType = Preconditions.checkNotNull(contentType); this.requestFactory = Preconditions.checkNotNull(requestFactory); this.createKey = Preconditions.checkNotNull(createKey); + this.retryOnConflictNum = retryOnConflictNum; } @Override @@ -95,8 +98,14 @@ private void processUpsert(RowData row, RequestIndexer indexer) { final String key = createKey.apply(row); if (key != null) { final UpdateRequest updateRequest = - requestFactory.createUpdateRequest( - indexGenerator.generate(row), docType, key, contentType, document); + requestFactory + .createUpdateRequest( + indexGenerator.generate(row), + docType, + key, + contentType, + document) + .retryOnConflict(retryOnConflictNum); indexer.add(updateRequest); } else { final IndexRequest indexRequest = diff --git a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkFactoryBaseTest.java b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkFactoryBaseTest.java index 8e5a98e7..ec383674 100644 --- a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkFactoryBaseTest.java +++ b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkFactoryBaseTest.java @@ -36,6 +36,7 @@ import java.util.Arrays; import java.util.Collections; +import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.RETRIES_ON_CONFLICT_OPTION; import static org.apache.flink.table.api.DataTypes.ARRAY; import static org.apache.flink.table.api.DataTypes.BIGINT; import static org.apache.flink.table.api.DataTypes.BYTES; @@ -253,4 +254,29 @@ public void testSinkParallelism() { (SinkV2Provider) esSink.getSinkRuntimeProvider(new ElasticsearchUtil.MockContext()); assertThat(provider.getParallelism()).hasValue(2); } + + @Test + public void testRetriesOnConflict() { + ElasticsearchDynamicSinkFactoryBase sinkFactory = createSinkFactory(); + DynamicTableSink sink = + sinkFactory.createDynamicTableSink( + createPrefilledTestContext() + .withOption(RETRIES_ON_CONFLICT_OPTION.key(), "2") + .build()); + assertThat(sink).isInstanceOf(ElasticsearchDynamicSink.class); + ElasticsearchDynamicSink esSink = (ElasticsearchDynamicSink) sink; + + assertThat(esSink.config.getRetriesOnConflict()).isEqualTo(2); + } + + @Test + public void testRetriesOnConflictDefault() { + ElasticsearchDynamicSinkFactoryBase sinkFactory = createSinkFactory(); + DynamicTableSink sink = + sinkFactory.createDynamicTableSink(createPrefilledTestContext().build()); + assertThat(sink).isInstanceOf(ElasticsearchDynamicSink.class); + ElasticsearchDynamicSink esSink = (ElasticsearchDynamicSink) sink; + + assertThat(esSink.config.getRetriesOnConflict()).isEqualTo(0); + } } diff --git a/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java b/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java index 1a2cdd18..8380f72b 100644 --- a/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java +++ b/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java @@ -148,7 +148,8 @@ public SinkFunctionProvider getSinkRuntimeProvider(Context context) { format, XContentType.JSON, REQUEST_FACTORY, - KeyExtractor.createKeyExtractor(schema, config.getKeyDelimiter())); + KeyExtractor.createKeyExtractor(schema, config.getKeyDelimiter()), + config.getRetriesOnConflict()); final ElasticsearchSink.Builder builder = builderProvider.createBuilder(config.getHosts(), upsertFunction); diff --git a/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicTableFactory.java b/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicTableFactory.java index 34fa0e58..268560d5 100644 --- a/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicTableFactory.java +++ b/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicTableFactory.java @@ -68,6 +68,7 @@ import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.INDEX_OPTION; import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.KEY_DELIMITER_OPTION; import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.PASSWORD_OPTION; +import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.RETRIES_ON_CONFLICT_OPTION; import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.USERNAME_OPTION; import static org.apache.flink.table.connector.source.lookup.LookupOptions.CACHE_TYPE; import static org.apache.flink.table.connector.source.lookup.LookupOptions.MAX_RETRIES; @@ -105,7 +106,8 @@ public class Elasticsearch6DynamicTableFactory PARTIAL_CACHE_EXPIRE_AFTER_WRITE, PARTIAL_CACHE_MAX_ROWS, PARTIAL_CACHE_CACHE_MISSING_KEY, - MAX_RETRIES) + MAX_RETRIES, + RETRIES_ON_CONFLICT_OPTION) .collect(Collectors.toSet()); @Override diff --git a/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java b/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java index 1926e445..35e681d6 100644 --- a/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java +++ b/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java @@ -143,7 +143,8 @@ public SinkFunctionProvider getSinkRuntimeProvider(Context context) { format, XContentType.JSON, REQUEST_FACTORY, - KeyExtractor.createKeyExtractor(schema, config.getKeyDelimiter())); + KeyExtractor.createKeyExtractor(schema, config.getKeyDelimiter()), + config.getRetriesOnConflict()); final ElasticsearchSink.Builder builder = builderProvider.createBuilder(config.getHosts(), upsertFunction); diff --git a/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicTableFactory.java b/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicTableFactory.java index b516777d..53ea0a12 100644 --- a/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicTableFactory.java +++ b/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicTableFactory.java @@ -67,6 +67,7 @@ import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.INDEX_OPTION; import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.KEY_DELIMITER_OPTION; import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.PASSWORD_OPTION; +import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.RETRIES_ON_CONFLICT_OPTION; import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.USERNAME_OPTION; import static org.apache.flink.table.connector.source.lookup.LookupOptions.CACHE_TYPE; import static org.apache.flink.table.connector.source.lookup.LookupOptions.MAX_RETRIES; @@ -104,7 +105,8 @@ public class Elasticsearch7DynamicTableFactory PARTIAL_CACHE_EXPIRE_AFTER_WRITE, PARTIAL_CACHE_MAX_ROWS, PARTIAL_CACHE_CACHE_MISSING_KEY, - MAX_RETRIES) + MAX_RETRIES, + RETRIES_ON_CONFLICT_OPTION) .collect(Collectors.toSet()); @Override