Skip to content

Commit

Permalink
Added backticks to spath keys and enabled the tests (#355)
Browse files Browse the repository at this point in the history
* Added backticks to spath keys to escape dots in the path, enabled eval spath test and changed expected column arrangement in aggregateAfterHdfsLoadTest

* added objects for escaping backticks, tests for the new objects

* Applied spotless

* used SpathEscapedKey to escape backticks in evalstatement

* re-reenabled eval spath tests after rebase

* Removed SpathUnescapedKey and added SpathKey that returns SpathEscapedKey object

* removed SpathEscapedKey,SpathKey and their tests.Added backtick support to the UnquotedText,added QoutedText and their tests

* Removed regex replaced with a ctor parameter to specify quote character
  • Loading branch information
Abigael-JT authored Dec 2, 2024
1 parent 8387db5 commit 3ae684c
Show file tree
Hide file tree
Showing 10 changed files with 357 additions and 67 deletions.
94 changes: 94 additions & 0 deletions src/main/java/com/teragrep/pth10/ast/QuotedText.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Teragrep Data Processing Language (DPL) translator for Apache Spark (pth_10)
* Copyright (C) 2019-2024 Suomen Kanuuna Oy
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*
* Additional permission under GNU Affero General Public License version 3
* section 7
*
* If you modify this Program, or any covered work, by linking or combining it
* with other code, such other code is not for that reason alone subject to any
* of the requirements of the GNU Affero GPL version 3 as long as this Program
* is the same Program as licensed from Suomen Kanuuna Oy without any additional
* modifications.
*
* Supplemented terms under GNU Affero General Public License version 3
* section 7
*
* Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified
* versions must be marked as "Modified version of" The Program.
*
* Names of the licensors and authors may not be used for publicity purposes.
*
* No rights are granted for use of trade names, trademarks, or service marks
* which are in The Program if any.
*
* Licensee must indemnify licensors and authors for any liability that these
* contractual assumptions impose on licensors and authors.
*
* To the extent this program is licensed as part of the Commercial versions of
* Teragrep, the applicable Commercial License may apply to this file if you as
* a licensee so wish it.
*/
package com.teragrep.pth10.ast;

import java.util.Objects;

public class QuotedText implements Text {

private final Text origin;
private final String quoteCharacter;

public QuotedText(final Text origin, final String quoteCharacter) {
this.origin = origin;
this.quoteCharacter = quoteCharacter;
}

@Override
public String read() {
validate();
return quotes(origin.read());
}

private void validate() {
if (quoteCharacter.length() != 1) {
throw new IllegalArgumentException("Quote character should be a single character");
}
}

private String quotes(final String s) {
if (s.startsWith(quoteCharacter) && s.endsWith(quoteCharacter)) {
return s;
}

return quoteCharacter.concat(s).concat(quoteCharacter);
}

@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
QuotedText that = (QuotedText) o;
return Objects.equals(origin, that.origin) && Objects.equals(quoteCharacter, that.quoteCharacter);
}

@Override
public int hashCode() {
return Objects.hash(origin, quoteCharacter);
}
}
18 changes: 18 additions & 0 deletions src/main/java/com/teragrep/pth10/ast/TextString.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
*/
package com.teragrep.pth10.ast;

import java.util.Objects;

/**
* Basic text from String
*/
Expand All @@ -69,4 +71,20 @@ public TextString(Object text) {
public String read() {
return text;
}

@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
TextString that = (TextString) o;
return Objects.equals(text, that.text);
}

@Override
public int hashCode() {
return Objects.hashCode(text);
}

}
58 changes: 42 additions & 16 deletions src/main/java/com/teragrep/pth10/ast/UnquotedText.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,45 +45,71 @@
*/
package com.teragrep.pth10.ast;

import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.Objects;

/**
* Decorator for unquoting text.
*/
public class UnquotedText implements Text {

private Text origin;
private final Text origin;
private final String[] quoteCharacters;

public UnquotedText(Text origin) {
this(origin, new String[] {
"\"", "'", "`"
});
}

public UnquotedText(Text origin, String ... quoteCharacters) {
this.origin = origin;
this.quoteCharacters = quoteCharacters;
}

@Override
public String read() {
validate();
return stripQuotes(this.origin.read());
}

private void validate() {
if (quoteCharacters.length <= 0) {
throw new IllegalArgumentException("Quote character(s) must be provided!");
}
}

/**
* Strips quotes
*
* @return string with stripped quotes
*/
private String stripQuotes(String quoted) {
Matcher m = Pattern.compile("^\"(.*)\"$").matcher(quoted);
Matcher m1 = Pattern.compile("^'(.*)'$").matcher(quoted);
private String stripQuotes(final String quoted) {
String rv = quoted;

String strUnquoted = quoted;
// check "-quotes
if (m.find()) {
strUnquoted = m.group(1);
}
else {
// check '-quotes
if (m1.find()) {
strUnquoted = m1.group(1);
// Removes outer quotes
for (int i = 0; i < quoteCharacters.length; i++) {
final String quoteCharacter = quoteCharacters[i];
if (rv.startsWith(quoteCharacter) && rv.endsWith(quoteCharacter)) {
rv = rv.substring(quoteCharacter.length(), rv.length() - quoteCharacter.length());
break;
}
}
return strUnquoted;

return rv;
}

@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
UnquotedText that = (UnquotedText) o;
return Objects.equals(origin, that.origin);
}

@Override
public int hashCode() {
return Objects.hashCode(origin);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
package com.teragrep.pth10.ast.commands.evalstatement;

import com.teragrep.pth10.ast.DPLParserCatalystContext;
import com.teragrep.pth10.ast.QuotedText;
import com.teragrep.pth10.ast.TextString;
import com.teragrep.pth10.ast.UnquotedText;
import com.teragrep.pth10.ast.bo.*;
Expand Down Expand Up @@ -2729,7 +2730,8 @@ private Node evalMethodSpathEmitCatalyst(DPLParser.EvalMethodSpathContext ctx) {
ss.udf().register("SpathUDF", SpathUDF);

Column res = functions.callUDF("SpathUDF", input, spathExpr, functions.lit(""), functions.lit(""));
rv = new ColumnNode(res);
// wrap expression in backticks to escape dots
rv = new ColumnNode(res.getItem(new QuotedText(new TextString(spathExpr.toString()), "`").read()));

return rv;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@

import com.google.gson.*;
import com.teragrep.pth10.ast.NullValue;
import com.teragrep.pth10.ast.QuotedText;
import com.teragrep.pth10.ast.TextString;
import com.teragrep.pth10.ast.UnquotedText;
import org.apache.commons.text.StringEscapeUtils;
Expand Down Expand Up @@ -91,6 +92,16 @@ public Spath(NullValue nullValue) {
this.nullValue = nullValue;
}

/**
* Returns result of spath as a map Keys are wrapped in backticks to escape dots, spark uses them for maps
*
* @param input json/xml input
* @param spathExpr spath/xpath expression
* @param nameOfInputCol name of input column
* @param nameOfOutputCol name of output column
* @return map of results
* @throws Exception
*/
@Override
public Map<String, String> call(String input, String spathExpr, String nameOfInputCol, String nameOfOutputCol)
throws Exception {
Expand All @@ -107,12 +118,7 @@ public Map<String, String> call(String input, String spathExpr, String nameOfInp
for (Map.Entry<String, JsonElement> sub : jsonElem.getAsJsonObject().entrySet()) {
// put key:value to map - unescaping result in case was a nested json string
result
.put(
sub.getKey(),
new UnquotedText(
new TextString(StringEscapeUtils.unescapeJson(sub.getValue().toString()))
).read()
);
.put(new QuotedText(new TextString(sub.getKey()), "`").read(), new UnquotedText(new TextString(StringEscapeUtils.unescapeJson(sub.getValue().toString()))).read());
}
}
// Manual extraction via spath expression (JSON)
Expand All @@ -122,12 +128,7 @@ jsonElem, new UnquotedText(new TextString(spathExpr)).read()
);
// put key:value to map - unescaping result in case was a nested json string
result
.put(
spathExpr,
jsonSubElem != null ? new UnquotedText(
new TextString(StringEscapeUtils.unescapeJson(jsonSubElem.toString()))
).read() : nullValue.value()
);
.put(new QuotedText(new TextString(spathExpr), "`").read(), jsonSubElem != null ? new UnquotedText(new TextString(StringEscapeUtils.unescapeJson(jsonSubElem.toString()))).read() : nullValue.value());
}
return result;
}
Expand Down Expand Up @@ -162,19 +163,19 @@ jsonElem, new UnquotedText(new TextString(spathExpr)).read()
LOGGER.debug("spath->xpath conversion: <[{}]>", spathAsXpath);

String rv = (String) xPath.compile(spathAsXpath).evaluate(doc, XPathConstants.STRING);
result.put(spathExpr, rv.trim());
result.put(new QuotedText(new TextString(spathExpr), "`").read(), rv.trim());
}
return result;
}
catch (Exception e) {
LOGGER.warn("spath: The content couldn't be parsed as JSON or XML. Details: <{}>", e.getMessage());
// return pre-existing content if output is the same as input
if (nameOfInputCol.equals(nameOfOutputCol)) {
result.put(spathExpr, input);
result.put(new QuotedText(new TextString(spathExpr), "`").read(), input);
}
// otherwise output will be empty on error
else {
result.put(spathExpr, nullValue.value());
result.put(new QuotedText(new TextString(spathExpr), "`").read(), nullValue.value());
}
return result;
}
Expand Down Expand Up @@ -277,12 +278,13 @@ private void buildMapFromXmlNodes(final Node rootNode, final String spacer, fina
}

// if there are multiple columns of the same name, add value to existing column
if (map.containsKey(colName)) {
String existingValue = map.get(colName);
map.put(colName, existingValue.concat("\n").concat(rootNode.getTextContent()));
final String quotedColName = new QuotedText(new TextString(colName), "`").read();
if (map.containsKey(quotedColName)) {
map
.computeIfPresent(quotedColName, (k, existingValue) -> existingValue.concat("\n").concat(rootNode.getTextContent()));
}
else {
map.put(colName, rootNode.getTextContent());
map.put(quotedColName, rootNode.getTextContent());
}
}

Expand Down
23 changes: 12 additions & 11 deletions src/main/java/com/teragrep/pth10/steps/spath/SpathStep.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
package com.teragrep.pth10.steps.spath;

import com.teragrep.pth10.ast.MapTypeColumn;
import com.teragrep.pth10.ast.QuotedText;
import com.teragrep.pth10.ast.TextString;
import com.teragrep.pth10.ast.UnquotedText;
import com.teragrep.pth10.ast.commands.evalstatement.UDFs.Spath;
Expand Down Expand Up @@ -86,7 +87,8 @@ public Dataset<Row> get(Dataset<Row> dataset) throws StreamingQueryException {

// Not in auto-extraction mode: can just return the first and only value from the map
if (!autoExtractionMode) {
return dataset.withColumn(new UnquotedText(new TextString(outputColumn)).read(), spathExpr.getItem(path));
return dataset
.withColumn(new UnquotedText(new TextString(outputColumn)).read(), spathExpr.getItem(new QuotedText(new TextString(path), "`").read()));
}

//
Expand All @@ -106,18 +108,17 @@ public Dataset<Row> get(Dataset<Row> dataset) throws StreamingQueryException {
// Each key is a new column with the cell contents being the value for that key

// Check for nulls; return an empty string if null, otherwise value for given key
// use substring to remove backticks that were added to escape dots in key name
for (String key : keys) {
withAppliedUdfDs = withAppliedUdfDs
.withColumn(
key, functions
.when(
/* if key.value == null */
functions.isnull(withAppliedUdfDs.col(outputColumn).getItem(key)),
/* then return empty string */
functions.lit("")
)
/* otherwise return key.value */
.otherwise(withAppliedUdfDs.col(outputColumn).getItem(key))
.withColumn(new UnquotedText(new TextString(key)).read(), functions.when(
/* if key.value == null */
functions.isnull(withAppliedUdfDs.col(outputColumn).getItem(key)),
/* then return empty string */
functions.lit("")
)
/* otherwise return key.value */
.otherwise(withAppliedUdfDs.col(outputColumn).getItem(key))
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ public void aggregateAfterHdfsLoadTest() {
new MetadataBuilder().build()
),
new StructField("host", DataTypes.StringType, true, new MetadataBuilder().build()),
new StructField("source", DataTypes.StringType, true, new MetadataBuilder().build()), new StructField("partition", DataTypes.StringType, true, new MetadataBuilder().build()), new StructField("offset", DataTypes.LongType, true, new MetadataBuilder().build()), new StructField("atmosphere_water_vapor_content", DataTypes.StringType, true, new MetadataBuilder().build()), new StructField("atmosphere_cloud_liquid_water_content", DataTypes.StringType, true, new MetadataBuilder().build()), new StructField("rainfall_rate", DataTypes.StringType, true, new MetadataBuilder().build()), new StructField("latitude", DataTypes.StringType, true, new MetadataBuilder().build()), new StructField("wind_speed", DataTypes.StringType, true, new MetadataBuilder().build())
new StructField("source", DataTypes.StringType, true, new MetadataBuilder().build()), new StructField("partition", DataTypes.StringType, true, new MetadataBuilder().build()), new StructField("offset", DataTypes.LongType, true, new MetadataBuilder().build()), new StructField("wind_speed", DataTypes.StringType, true, new MetadataBuilder().build()), new StructField("latitude", DataTypes.StringType, true, new MetadataBuilder().build()), new StructField("rainfall_rate", DataTypes.StringType, true, new MetadataBuilder().build()), new StructField("atmosphere_cloud_liquid_water_content", DataTypes.StringType, true, new MetadataBuilder().build()), new StructField("atmosphere_water_vapor_content", DataTypes.StringType, true, new MetadataBuilder().build())
}), ds.schema());
}
);
Expand Down
Loading

0 comments on commit 3ae684c

Please sign in to comment.