From 4e8ed09f473bd7aea19761be7cb026b9e25d5fa3 Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Thu, 21 Oct 2021 14:51:03 -0400 Subject: [PATCH] [ML] optimize source extraction for categorize_text aggregation (#79099) This optimizes the text value extraction from source in categorize_text aggregation. Early measurements indicate that the bulk of the time spent in this aggregation is inflating and deserializing the source. We can optimize this a bit (for larger sources) by only extracting the text field we care about. The main downside here is if there is a sub-agg that requires the source, the that agg will need to extract the entire source again. This should be a rare case. NOTE: opening as draft as measurements need to be done on some realistic data to see if this actually saves us time. This takes advantage of the work done here: /~https://github.com/elastic/elasticsearch/pull/77154 --- .../org/elasticsearch/xcontent/XContent.java | 4 + .../xcontent/cbor/CborXContent.java | 20 +++++ .../xcontent/json/JsonXContent.java | 20 +++++ .../xcontent/smile/SmileXContent.java | 20 +++++ .../xcontent/yaml/YamlXContent.java | 20 +++++ .../common/xcontent/XContentHelper.java | 86 ++++++++++++++++--- .../search/lookup/SourceLookup.java | 39 +++++++++ .../CategorizeTextAggregator.java | 2 +- 8 files changed, 199 insertions(+), 12 deletions(-) diff --git a/libs/x-content/src/main/java/org/elasticsearch/xcontent/XContent.java b/libs/x-content/src/main/java/org/elasticsearch/xcontent/XContent.java index d40bedf38b39..227518b44c20 100644 --- a/libs/x-content/src/main/java/org/elasticsearch/xcontent/XContent.java +++ b/libs/x-content/src/main/java/org/elasticsearch/xcontent/XContent.java @@ -82,6 +82,10 @@ XContentParser createParser(NamedXContentRegistry xContentRegistry, DeprecationH XContentParser createParser(NamedXContentRegistry xContentRegistry, DeprecationHandler deprecationHandler, byte[] data, int offset, int length) throws IOException; + XContentParser createParser(NamedXContentRegistry xContentRegistry, + DeprecationHandler deprecationHandler, byte[] data, int offset, int length, FilterPath[] includes, + FilterPath[] excludes) throws IOException; + /** * Creates a parser over the provided reader. */ diff --git a/libs/x-content/src/main/java/org/elasticsearch/xcontent/cbor/CborXContent.java b/libs/x-content/src/main/java/org/elasticsearch/xcontent/cbor/CborXContent.java index 9dfb6f47f7e8..d43e3b10b225 100644 --- a/libs/x-content/src/main/java/org/elasticsearch/xcontent/cbor/CborXContent.java +++ b/libs/x-content/src/main/java/org/elasticsearch/xcontent/cbor/CborXContent.java @@ -112,6 +112,26 @@ public XContentParser createParser(NamedXContentRegistry xContentRegistry, return createParserForCompatibility(xContentRegistry, deprecationHandler, data, offset, length, RestApiVersion.current()); } + @Override + public XContentParser createParser( + NamedXContentRegistry xContentRegistry, + DeprecationHandler deprecationHandler, + byte[] data, + int offset, + int length, + FilterPath[] includes, + FilterPath[] excludes + ) throws IOException { + return new CborXContentParser( + xContentRegistry, + deprecationHandler, + cborFactory.createParser(new ByteArrayInputStream(data, offset, length)), + RestApiVersion.current(), + includes, + excludes + ); + } + @Override public XContentParser createParser(NamedXContentRegistry xContentRegistry, DeprecationHandler deprecationHandler, Reader reader) throws IOException { diff --git a/libs/x-content/src/main/java/org/elasticsearch/xcontent/json/JsonXContent.java b/libs/x-content/src/main/java/org/elasticsearch/xcontent/json/JsonXContent.java index cf551f576131..10df2c1c10d8 100644 --- a/libs/x-content/src/main/java/org/elasticsearch/xcontent/json/JsonXContent.java +++ b/libs/x-content/src/main/java/org/elasticsearch/xcontent/json/JsonXContent.java @@ -113,6 +113,26 @@ public XContentParser createParser(NamedXContentRegistry xContentRegistry, return createParserForCompatibility(xContentRegistry, deprecationHandler, data, offset, length, RestApiVersion.current()); } + @Override + public XContentParser createParser( + NamedXContentRegistry xContentRegistry, + DeprecationHandler deprecationHandler, + byte[] data, + int offset, + int length, + FilterPath[] includes, + FilterPath[] excludes + ) throws IOException { + return new JsonXContentParser( + xContentRegistry, + deprecationHandler, + jsonFactory.createParser(new ByteArrayInputStream(data, offset, length)), + RestApiVersion.current(), + includes, + excludes + ); + } + @Override public XContentParser createParser(NamedXContentRegistry xContentRegistry, DeprecationHandler deprecationHandler, Reader reader) throws IOException { diff --git a/libs/x-content/src/main/java/org/elasticsearch/xcontent/smile/SmileXContent.java b/libs/x-content/src/main/java/org/elasticsearch/xcontent/smile/SmileXContent.java index e02f8ec307af..696865a24283 100644 --- a/libs/x-content/src/main/java/org/elasticsearch/xcontent/smile/SmileXContent.java +++ b/libs/x-content/src/main/java/org/elasticsearch/xcontent/smile/SmileXContent.java @@ -114,6 +114,26 @@ public XContentParser createParser(NamedXContentRegistry xContentRegistry, return createParserForCompatibility(xContentRegistry, deprecationHandler, data, offset, length, RestApiVersion.current()); } + @Override + public XContentParser createParser( + NamedXContentRegistry xContentRegistry, + DeprecationHandler deprecationHandler, + byte[] data, + int offset, + int length, + FilterPath[] includes, + FilterPath[] excludes + ) throws IOException { + return new SmileXContentParser( + xContentRegistry, + deprecationHandler, + smileFactory.createParser(new ByteArrayInputStream(data, offset, length)), + RestApiVersion.current(), + includes, + excludes + ); + } + @Override public XContentParser createParser(NamedXContentRegistry xContentRegistry, DeprecationHandler deprecationHandler, Reader reader) throws IOException { diff --git a/libs/x-content/src/main/java/org/elasticsearch/xcontent/yaml/YamlXContent.java b/libs/x-content/src/main/java/org/elasticsearch/xcontent/yaml/YamlXContent.java index b3a684d20583..68f1ac2bbf27 100644 --- a/libs/x-content/src/main/java/org/elasticsearch/xcontent/yaml/YamlXContent.java +++ b/libs/x-content/src/main/java/org/elasticsearch/xcontent/yaml/YamlXContent.java @@ -106,6 +106,26 @@ public XContentParser createParser(NamedXContentRegistry xContentRegistry, return createParserForCompatibility(xContentRegistry, deprecationHandler, data, offset, length, RestApiVersion.current()); } + @Override + public XContentParser createParser( + NamedXContentRegistry xContentRegistry, + DeprecationHandler deprecationHandler, + byte[] data, + int offset, + int length, + FilterPath[] includes, + FilterPath[] excludes + ) throws IOException { + return new YamlXContentParser( + xContentRegistry, + deprecationHandler, + yamlFactory.createParser(new ByteArrayInputStream(data, offset, length)), + RestApiVersion.current(), + includes, + excludes + ); + } + @Override public XContentParser createParser(NamedXContentRegistry xContentRegistry, DeprecationHandler deprecationHandler, Reader reader) throws IOException { diff --git a/server/src/main/java/org/elasticsearch/common/xcontent/XContentHelper.java b/server/src/main/java/org/elasticsearch/common/xcontent/XContentHelper.java index 5c891843c043..6a5e253ffffe 100644 --- a/server/src/main/java/org/elasticsearch/common/xcontent/XContentHelper.java +++ b/server/src/main/java/org/elasticsearch/common/xcontent/XContentHelper.java @@ -13,6 +13,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Tuple; import org.elasticsearch.common.compress.Compressor; import org.elasticsearch.common.compress.CompressorFactory; @@ -27,6 +28,7 @@ import org.elasticsearch.xcontent.XContentParseException; import org.elasticsearch.xcontent.XContentType; import org.elasticsearch.xcontent.XContentParser; +import org.elasticsearch.xcontent.support.filtering.FilterPath; import java.io.BufferedInputStream; import java.io.IOException; @@ -101,6 +103,14 @@ public static Tuple> convertToMap(BytesReferen return convertToMap(bytes, ordered, null); } + /** + * Exactly the same as {@link XContentHelper#convertToMap(BytesReference, boolean, XContentType, FilterPath[], FilterPath[])} but + * none of the fields are filtered + */ + public static Tuple> convertToMap(BytesReference bytes, boolean ordered, XContentType xContentType) { + return convertToMap(bytes, ordered, xContentType, null, null); + } + /** * Converts the given bytes into a map that is optionally ordered. The provided {@link XContentType} must be non-null. *

@@ -110,8 +120,13 @@ public static Tuple> convertToMap(BytesReferen * frequently when folks write nanosecond precision dates as a decimal * number. */ - public static Tuple> convertToMap(BytesReference bytes, boolean ordered, XContentType xContentType) - throws ElasticsearchParseException { + public static Tuple> convertToMap( + BytesReference bytes, + boolean ordered, + XContentType xContentType, + @Nullable FilterPath[] include, + @Nullable FilterPath[] exclude + ) throws ElasticsearchParseException { try { final XContentType contentType; InputStream input; @@ -129,14 +144,16 @@ public static Tuple> convertToMap(BytesReferen final int length = bytes.length(); contentType = xContentType != null ? xContentType : XContentFactory.xContentType(raw, offset, length); return new Tuple<>(Objects.requireNonNull(contentType), - convertToMap(XContentFactory.xContent(contentType), raw, offset, length, ordered)); + convertToMap(XContentFactory.xContent(contentType), raw, offset, length, ordered, include, exclude)); } else { input = bytes.streamInput(); contentType = xContentType != null ? xContentType : XContentFactory.xContentType(input); } try (InputStream stream = input) { - return new Tuple<>(Objects.requireNonNull(contentType), - convertToMap(XContentFactory.xContent(contentType), stream, ordered)); + return new Tuple<>( + Objects.requireNonNull(contentType), + convertToMap(XContentFactory.xContent(contentType), stream, ordered, include, exclude) + ); } } catch (IOException e) { throw new ElasticsearchParseException("Failed to parse content to map", e); @@ -158,14 +175,35 @@ public static Map convertToMap(XContent xContent, String string, } /** - * Convert a string in some {@link XContent} format to a {@link Map}. Throws an {@link ElasticsearchParseException} if there is any - * error. Note that unlike {@link #convertToMap(BytesReference, boolean)}, this doesn't automatically uncompress the input. + * The same as {@link XContentHelper#convertToMap(XContent, byte[], int, int, boolean, FilterPath[], FilterPath[])} but none of the + * fields are filtered. */ public static Map convertToMap(XContent xContent, InputStream input, boolean ordered) throws ElasticsearchParseException { + return convertToMap(xContent, input, ordered, null, null); + } + + /** + * Convert a string in some {@link XContent} format to a {@link Map}. Throws an {@link ElasticsearchParseException} if there is any + * error. Note that unlike {@link #convertToMap(BytesReference, boolean)}, this doesn't automatically uncompress the input. + * + * Additionally, fields may be included or excluded from the parsing. + */ + public static Map convertToMap( + XContent xContent, + InputStream input, + boolean ordered, + @Nullable FilterPath[] include, + @Nullable FilterPath[] exclude + ) throws ElasticsearchParseException { // It is safe to use EMPTY here because this never uses namedObject - try (XContentParser parser = xContent.createParser(NamedXContentRegistry.EMPTY, - DeprecationHandler.THROW_UNSUPPORTED_OPERATION, input)) { + try (XContentParser parser = xContent.createParser( + NamedXContentRegistry.EMPTY, + DeprecationHandler.THROW_UNSUPPORTED_OPERATION, + input, + include, + exclude + )) { return ordered ? parser.mapOrdered() : parser.map(); } catch (IOException e) { throw new ElasticsearchParseException("Failed to parse content to map", e); @@ -178,9 +216,35 @@ public static Map convertToMap(XContent xContent, InputStream in */ public static Map convertToMap(XContent xContent, byte[] bytes, int offset, int length, boolean ordered) throws ElasticsearchParseException { + return convertToMap(xContent, bytes, offset, length, ordered, null, null); + } + + /** + * Convert a byte array in some {@link XContent} format to a {@link Map}. Throws an {@link ElasticsearchParseException} if there is any + * error. Note that unlike {@link #convertToMap(BytesReference, boolean)}, this doesn't automatically uncompress the input. + * + * Unlike {@link XContentHelper#convertToMap(XContent, byte[], int, int, boolean)} this optionally accepts fields to include or exclude + * during XContent parsing. + */ + public static Map convertToMap( + XContent xContent, + byte[] bytes, + int offset, + int length, + boolean ordered, + @Nullable FilterPath[] include, + @Nullable FilterPath[] exclude + ) throws ElasticsearchParseException { // It is safe to use EMPTY here because this never uses namedObject - try (XContentParser parser = xContent.createParser(NamedXContentRegistry.EMPTY, - DeprecationHandler.THROW_UNSUPPORTED_OPERATION, bytes, offset, length)) { + try (XContentParser parser = xContent.createParser( + NamedXContentRegistry.EMPTY, + DeprecationHandler.THROW_UNSUPPORTED_OPERATION, + bytes, + offset, + length, + include, + exclude) + ) { return ordered ? parser.mapOrdered() : parser.map(); } catch (IOException e) { throw new ElasticsearchParseException("Failed to parse content to map", e); diff --git a/server/src/main/java/org/elasticsearch/search/lookup/SourceLookup.java b/server/src/main/java/org/elasticsearch/search/lookup/SourceLookup.java index 505ea16927e9..d498c9cffe46 100644 --- a/server/src/main/java/org/elasticsearch/search/lookup/SourceLookup.java +++ b/server/src/main/java/org/elasticsearch/search/lookup/SourceLookup.java @@ -20,6 +20,7 @@ import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.index.fieldvisitor.FieldsVisitor; import org.elasticsearch.search.fetch.subphase.FetchSourceContext; +import org.elasticsearch.xcontent.support.filtering.FilterPath; import java.io.IOException; import java.util.Collection; @@ -157,6 +158,44 @@ public List extractRawValues(String path) { return XContentMapValues.extractRawValues(path, source()); } + /** + * Returns the values associated with the path. Those are "low" level values, and it can + * handle path expression where an array/list is navigated within. + * + * The major difference with {@link SourceLookup#extractRawValues(String)} is that this version will: + * + * - not cache source if it's not already parsed + * - will only extract the desired values from the compressed source instead of deserializing the whole object + * + * This is useful when the caller only wants a single value from source and does not care of source is fully parsed and cached + * for later use. + * @param path The path from which to extract the values from source + * @return The list of found values or an empty list if none are found + */ + public List extractRawValuesWithoutCaching(String path) { + if (source != null) { + return XContentMapValues.extractRawValues(path, source); + } + FilterPath[] filterPaths = FilterPath.compile(Set.of(path)); + if (sourceAsBytes != null) { + return XContentMapValues.extractRawValues( + path, + XContentHelper.convertToMap(sourceAsBytes, false, null, filterPaths, null).v2() + ); + } + try { + FieldsVisitor sourceFieldVisitor = new FieldsVisitor(true); + fieldReader.accept(docId, sourceFieldVisitor); + BytesReference source = sourceFieldVisitor.source(); + return XContentMapValues.extractRawValues( + path, + XContentHelper.convertToMap(source, false, null, filterPaths, null).v2() + ); + } catch (Exception e) { + throw new ElasticsearchParseException("failed to parse / load source", e); + } + } + /** * For the provided path, return its value in the source. * diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/categorization/CategorizeTextAggregator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/categorization/CategorizeTextAggregator.java index f491be16c71f..386c41255747 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/categorization/CategorizeTextAggregator.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/categorization/CategorizeTextAggregator.java @@ -183,7 +183,7 @@ public void collect(int doc, long owningBucketOrd) throws IOException { private void collectFromSource(int doc, long owningBucketOrd, CategorizationTokenTree categorizer) throws IOException { sourceLookup.setSegmentAndDocument(ctx, doc); - Iterator itr = sourceLookup.extractRawValues(sourceFieldName).stream().map(obj -> { + Iterator itr = sourceLookup.extractRawValuesWithoutCaching(sourceFieldName).stream().map(obj -> { if (obj == null) { return null; }