Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bloom aggregator string list support #380

Merged
merged 8 commits into from
Oct 23, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@

import com.teragrep.functions.dpf_03.BloomFilterAggregator;
import com.teragrep.pth10.steps.AbstractStep;
import com.teragrep.pth10.steps.teragrep.aggregate.ColumnBinaryListingDataset;
import com.teragrep.pth10.steps.teragrep.bloomfilter.BloomFilterForeachPartitionFunction;
import com.teragrep.pth10.steps.teragrep.bloomfilter.BloomFilterTable;
import com.teragrep.pth10.steps.teragrep.bloomfilter.FilterTypes;
Expand Down Expand Up @@ -161,14 +162,17 @@ private Dataset<Row> estimateSize(Dataset<Row> dataset) {
.agg(functions.approxCountDistinct("token").as(outputCol));
}

public Dataset<Row> aggregate(Dataset<Row> dataset) {

FilterTypes filterTypes = new FilterTypes(this.zeppelinConfig);

BloomFilterAggregator agg = new BloomFilterAggregator(inputCol, estimateCol, filterTypes.sortedMap());

return dataset.groupBy("partition").agg(agg.toColumn().as("bloomfilter"));

public Dataset<Row> aggregate(final Dataset<Row> dataset) {
final ColumnBinaryListingDataset colBinaryListingDataset = new ColumnBinaryListingDataset(dataset, inputCol);
final BloomFilterAggregator bloomFilterAggregator = new BloomFilterAggregator(
inputCol,
estimateCol,
new FilterTypes(this.zeppelinConfig).sortedMap()
);
return colBinaryListingDataset
.dataset()
.groupBy("partition")
.agg(bloomFilterAggregator.toColumn().as("bloomfilter"));
}

private void writeFilterTypes(final Config config) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Teragrep Data Processing Language (DPL) translator for Apache Spark (pth_10)
* Copyright (C) 2019-2024 Suomen Kanuuna Oy
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*
* Additional permission under GNU Affero General Public License version 3
* section 7
*
* If you modify this Program, or any covered work, by linking or combining it
* with other code, such other code is not for that reason alone subject to any
* of the requirements of the GNU Affero GPL version 3 as long as this Program
* is the same Program as licensed from Suomen Kanuuna Oy without any additional
* modifications.
*
* Supplemented terms under GNU Affero General Public License version 3
* section 7
*
* Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified
* versions must be marked as "Modified version of" The Program.
*
* Names of the licensors and authors may not be used for publicity purposes.
*
* No rights are granted for use of trade names, trademarks, or service marks
* which are in The Program if any.
*
* Licensee must indemnify licensors and authors for any liability that these
* contractual assumptions impose on licensors and authors.
*
* To the extent this program is licensed as part of the Commercial versions of
* Teragrep, the applicable Commercial License may apply to this file if you as
* a licensee so wish it.
*/
package com.teragrep.pth10.steps.teragrep.aggregate;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.*;
import scala.collection.mutable.WrappedArray;

import java.nio.charset.StandardCharsets;

public final class ColumnBinaryListingDataset {

private final Dataset<Row> dataset;
private final String inputCol;

public ColumnBinaryListingDataset(Dataset<Row> dataset, String inputCol) {
this.dataset = dataset;
this.inputCol = inputCol;
}

private Dataset<Row> toBinaryList() {
// use scala WrappedArray to avoid casting errors with Java in spark
final UDF1<WrappedArray<String>, WrappedArray<byte[]>> udf = stringList -> {
final byte[][] bytesArray = new byte[stringList.length()][];
for (int i = 0; i < stringList.length(); i++) {
bytesArray[i] = stringList.apply(i).getBytes(StandardCharsets.UTF_8);
}
return WrappedArray.make(bytesArray);
};
return dataset
.withColumn(inputCol, functions.udf(udf, DataTypes.createArrayType(DataTypes.BinaryType)).apply(dataset.col(inputCol)));
}

public Dataset<Row> dataset() {
final Dataset<Row> binaryDataset;
final DataType datatype = dataset.schema().apply(inputCol).dataType();
final boolean isStringArray = datatype.sameType(DataTypes.createArrayType(DataTypes.StringType));
final boolean isBinaryArray = datatype.sameType(DataTypes.createArrayType(DataTypes.BinaryType));
// if already binary type return dataset
if (isBinaryArray) {
binaryDataset = dataset;
}
// convert to list of string to list of bytes if strings
else if (isStringArray) {
binaryDataset = toBinaryList();
}
else { // add other types if needed
throw new RuntimeException(
"Input column <" + inputCol + "> has unsupported column type <" + datatype
+ ">, supported types are ArrayType(BinaryType), ArrayType(StringType)"
);
}
return binaryDataset;
}
}
157 changes: 145 additions & 12 deletions src/test/java/com/teragrep/pth10/BloomFilterOperationsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,24 +45,29 @@
*/
package com.teragrep.pth10;

import com.teragrep.pth10.steps.teragrep.TeragrepBloomStep;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.MetadataBuilder;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.util.sketch.BloomFilter;
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.List;
import java.io.ByteArrayInputStream;
import java.util.*;
import java.util.stream.Collectors;

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class BloomFilterOperationsTest {

private static final Logger LOGGER = LoggerFactory.getLogger(BloomFilterOperationsTest.class);
private final String testFile = "src/test/resources/xmlWalkerTestDataStreaming/bloomTeragrepStep_data*.jsonl";
private final String aggregateFile = "src/test/resources/xmlWalkerTestDataStreaming/bloomTeragrepStep_aggregation_data*.jsonl";

private final StructType testSchema = new StructType(new StructField[] {
new StructField("_time", DataTypes.TimestampType, false, new MetadataBuilder().build()),
Expand All @@ -82,21 +87,11 @@ public class BloomFilterOperationsTest {
void setEnv() {
streamingTestUtil = new StreamingTestUtil(this.testSchema);
streamingTestUtil.setEnv();
/*
Class.forName ("org.h2.Driver");
this.conn = DriverManager.getConnection("jdbc:h2:~/test;MODE=MariaDB;DATABASE_TO_LOWER=TRUE;CASE_INSENSITIVE_IDENTIFIERS=TRUE", "sa", "");
org.h2.tools.RunScript.execute(conn, new FileReader("src/test/resources/bloomdb/bloomdb.sql"));
*/
}

@BeforeEach
void setUp() {
streamingTestUtil.setUp();
/*
conn.prepareStatement("TRUNCATE TABLE filter_expected_100000_fpp_001").execute();
conn.prepareStatement("TRUNCATE TABLE filter_expected_1000000_fpp_003").execute();
conn.prepareStatement("TRUNCATE TABLE filter_expected_2500000_fpp_005").execute();
*/
}

@AfterEach
Expand Down Expand Up @@ -132,4 +127,142 @@ public void estimateTest() {
}
);
}

@Test
@DisabledIfSystemProperty(
named = "skipSparkTest",
matches = "true"
)
public void testAggregateWithTokenizerFormatBytes() {
final String id = UUID.randomUUID().toString();
final Properties properties = new Properties();
properties.put("dpl.pth_06.bloom.db.fields", "[ {expected: 100, fpp: 0.01}]");
streamingTestUtil
.performDPLTest(
"index=index_A earliest=2020-01-01T00:00:00z latest=2023-01-01T00:00:00z "
+ "| teragrep exec tokenizer format bytes "
+ "| teragrep exec hdfs save overwrite=true /tmp/pth_10_hdfs/aggregatorTokenBytes/"
+ id,
aggregateFile, ds -> {
}
);
this.streamingTestUtil.setUp();
streamingTestUtil
.performDPLTest(
"| teragrep exec hdfs load /tmp/pth_10_hdfs/aggregatorTokenBytes/" + id + " "
+ "| teragrep exec bloom estimate "
+ "| teragrep exec hdfs save overwrite=true /tmp/pth_10_hdfs/aggregatorEstimate/" + id,
aggregateFile, ds -> {
}
);
this.streamingTestUtil.setUp();
streamingTestUtil
.performDPLTest(
"| teragrep exec hdfs load /tmp/pth_10_hdfs/aggregatorTokenBytes/" + id
+ "| join type=inner max=0 partition [| teragrep exec hdfs load /tmp/pth_10_hdfs/aggregatorEstimate/"
+ id,
aggregateFile, ds -> {
Config config = ConfigFactory.parseProperties(properties);
TeragrepBloomStep step = new TeragrepBloomStep(
config,
TeragrepBloomStep.BloomMode.AGGREGATE,
"tokens",
"bloomfilter",
"R_estimate(tokens)"
);
ds = step.aggregate(ds);
List<byte[]> listOfResult = ds
.select("bloomfilter")
.collectAsList()
.stream()
.map(r -> (byte[]) r.get(0))
.collect(Collectors.toList());
Assertions.assertEquals(2, listOfResult.size());
BloomFilter filter1 = Assertions
.assertDoesNotThrow(
() -> BloomFilter.readFrom(new ByteArrayInputStream(listOfResult.get(0)))
);
// should find all tokens
Assertions.assertTrue(filter1.mightContain("bc113100-b859-4041-b272-88b849f6d6db"));
Assertions.assertTrue(filter1.mightContain("userid"));
Assertions.assertTrue(filter1.mightContain("userid="));
Assertions.assertTrue(filter1.mightContain("userid=bc113100-b859-4041-b272-88b849f6d6db"));

}
);
}

@Test
@DisabledIfSystemProperty(
named = "skipSparkTest",
matches = "true"
)
public void testAggregateUsingRegexExtract() {
final Properties properties = new Properties();
properties.put("dpl.pth_06.bloom.db.fields", "[ {expected: 100, fpp: 0.01}]");
final String id = UUID.randomUUID().toString();
final String regex = "\\w{8}-\\w{4}-\\w{4}-\\w{4}-\\w{12}";
streamingTestUtil
.performDPLTest(
"index=index_A earliest=2020-01-01T00:00:00z latest=2023-01-01T00:00:00z "
+ "| teragrep exec regexextract regex " + regex
+ "| teragrep exec hdfs save overwrite=true /tmp/pth_10_hdfs/aggregatorTokenBytes/"
+ id,
aggregateFile, ds -> {
}
);
this.streamingTestUtil.setUp();
streamingTestUtil
.performDPLTest(
"| teragrep exec hdfs load /tmp/pth_10_hdfs/aggregatorTokenBytes/" + id + " "
+ "| teragrep exec bloom estimate "
+ "| teragrep exec hdfs save overwrite=true /tmp/pth_10_hdfs/aggregatorEstimate/" + id,
aggregateFile, ds -> {
}
);
this.streamingTestUtil.setUp();
streamingTestUtil
.performDPLTest(
"| teragrep exec hdfs load /tmp/pth_10_hdfs/aggregatorTokenBytes/" + id
+ "| join type=inner max=0 partition [| teragrep exec hdfs load /tmp/pth_10_hdfs/aggregatorEstimate/"
+ id,
aggregateFile, ds -> {
Config config = ConfigFactory.parseProperties(properties);
TeragrepBloomStep step = new TeragrepBloomStep(
config,
TeragrepBloomStep.BloomMode.AGGREGATE,
"tokens",
"bloomfilter",
"R_estimate(tokens)"
);
ds = step.aggregate(ds);
List<byte[]> listOfResult = ds
.select("bloomfilter")
.collectAsList()
.stream()
.map(r -> (byte[]) r.get(0))
.collect(Collectors.toList());
Assertions.assertEquals(2, listOfResult.size());
BloomFilter filter1 = Assertions
.assertDoesNotThrow(
() -> BloomFilter.readFrom(new ByteArrayInputStream(listOfResult.get(0)))
);
BloomFilter filter2 = Assertions
.assertDoesNotThrow(
() -> BloomFilter.readFrom(new ByteArrayInputStream(listOfResult.get(1)))
);
// should only find regex match tokens
Assertions.assertTrue(filter1.mightContain("bc113100-b859-4041-b272-88b849f6d6db"));
Assertions.assertFalse(filter1.mightContain("962e5f8c-fffe-4ea6-a164-b39e0ce4ceb4"));
Assertions.assertFalse(filter1.mightContain("userid"));
Assertions.assertFalse(filter1.mightContain("userid="));
Assertions.assertFalse(filter1.mightContain("uuid"));
Assertions.assertFalse(filter1.mightContain("uuid="));
Assertions.assertFalse(filter1.mightContain("uuid=962e5f8c-fffe-4ea6-a164-b39e0ce4ceb4"));
// check that filter 2 found both UUIDs
Assertions.assertTrue(filter2.mightContain("bc113100-b859-4041-b272-88b849f6d6db"));
Assertions.assertTrue(filter2.mightContain("962e5f8c-fffe-4ea6-a164-b39e0ce4ceb4"));
}
);
}
}
34 changes: 34 additions & 0 deletions src/test/java/com/teragrep/pth10/TeragrepTransformationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -757,4 +757,38 @@ public void tgRegexExtractionMissingRegexExceptionTest() {
});
Assertions.assertTrue(exception.getMessage().contains("Missing regex parameter"));
}

@Test
@DisabledIfSystemProperty(
named = "skipSparkTest",
matches = "true"
)
public void tgHdfsSaveAfterBloomEstimateTestUsingRegexExtract() {
final String id = UUID.randomUUID().toString();
streamingTestUtil
.performDPLTest(
"index=index_A | teragrep exec regexextract regex \\d+ | teragrep exec bloom estimate | teragrep exec hdfs save overwrite=true /tmp/pth_10_hdfs/regexextract/"
+ id,
testFile, ds -> {
List<String> listOfResult = ds
.select("estimate(tokens)")
.collectAsList()
.stream()
.map(r -> r.getAs(0).toString())
.collect(Collectors.toList());
Assertions.assertEquals(Collections.singletonList("5"), listOfResult);
}
);
this.streamingTestUtil.setUp();
streamingTestUtil
.performDPLTest("| teragrep exec hdfs load /tmp/pth_10_hdfs/regexextract/" + id, testFile, ds -> {
List<String> listOfResult = ds
.select("estimate(tokens)")
.collectAsList()
.stream()
.map(r -> r.getAs(0).toString())
.collect(Collectors.toList());
Assertions.assertEquals(Collections.singletonList("5"), listOfResult);
});
}
}
Loading