diff --git a/core/src/main/java/com/alibaba/alink/operator/batch/dataproc/CsvToColumnsBatchOp.java b/core/src/main/java/com/alibaba/alink/operator/batch/dataproc/CsvToColumnsBatchOp.java new file mode 100644 index 000000000..0aa6fb27c --- /dev/null +++ b/core/src/main/java/com/alibaba/alink/operator/batch/dataproc/CsvToColumnsBatchOp.java @@ -0,0 +1,22 @@ +package com.alibaba.alink.operator.batch.dataproc; + +import com.alibaba.alink.operator.batch.utils.MapBatchOp; +import com.alibaba.alink.operator.common.dataproc.StringToColumnsMappers; +import com.alibaba.alink.params.dataproc.CsvToColumnsParams; +import org.apache.flink.ml.api.misc.param.Params; + +/** + * CsvToColumnsBatchOp parses a CSV formatted string column to several columns, + * according to the specified schema. + */ +public final class CsvToColumnsBatchOp extends MapBatchOp + implements CsvToColumnsParams { + + public CsvToColumnsBatchOp() { + this(null); + } + + public CsvToColumnsBatchOp(Params params) { + super(StringToColumnsMappers.CsvToColumnsMapper::new, params); + } +} diff --git a/core/src/main/java/com/alibaba/alink/operator/batch/dataproc/JsonToColumnsBatchOp.java b/core/src/main/java/com/alibaba/alink/operator/batch/dataproc/JsonToColumnsBatchOp.java new file mode 100644 index 000000000..8ba1f0126 --- /dev/null +++ b/core/src/main/java/com/alibaba/alink/operator/batch/dataproc/JsonToColumnsBatchOp.java @@ -0,0 +1,22 @@ +package com.alibaba.alink.operator.batch.dataproc; + +import com.alibaba.alink.operator.batch.utils.MapBatchOp; +import com.alibaba.alink.operator.common.dataproc.StringToColumnsMappers; +import com.alibaba.alink.params.dataproc.JsonToColumnsParams; +import org.apache.flink.ml.api.misc.param.Params; + +/** + * JsonToColumnsBatchOp parses a json string column to several columns, + * according to the specified schema. + */ +public final class JsonToColumnsBatchOp extends MapBatchOp + implements JsonToColumnsParams { + + public JsonToColumnsBatchOp() { + this(null); + } + + public JsonToColumnsBatchOp(Params params) { + super(StringToColumnsMappers.JsonToColumnsMapper::new, params); + } +} diff --git a/core/src/main/java/com/alibaba/alink/operator/batch/dataproc/KvToColumnsBatchOp.java b/core/src/main/java/com/alibaba/alink/operator/batch/dataproc/KvToColumnsBatchOp.java new file mode 100644 index 000000000..882a5909b --- /dev/null +++ b/core/src/main/java/com/alibaba/alink/operator/batch/dataproc/KvToColumnsBatchOp.java @@ -0,0 +1,24 @@ +package com.alibaba.alink.operator.batch.dataproc; + +import com.alibaba.alink.operator.batch.utils.MapBatchOp; +import com.alibaba.alink.operator.common.dataproc.StringToColumnsMappers; +import com.alibaba.alink.params.dataproc.KvToColumnsParams; +import org.apache.flink.ml.api.misc.param.Params; + +/** + * KvToColumnsBatchOp parses a key-value string column to several columns, + * according to the specified schema. + *

+ * The key-value string has format like: f1=val1,f2=val2,f3=val3 + */ +public final class KvToColumnsBatchOp extends MapBatchOp + implements KvToColumnsParams { + + public KvToColumnsBatchOp() { + this(null); + } + + public KvToColumnsBatchOp(Params params) { + super(StringToColumnsMappers.KvToColumnsMapper::new, params); + } +} diff --git a/core/src/main/java/com/alibaba/alink/operator/common/dataproc/StringParsers.java b/core/src/main/java/com/alibaba/alink/operator/common/dataproc/StringParsers.java new file mode 100644 index 000000000..365e3bdfa --- /dev/null +++ b/core/src/main/java/com/alibaba/alink/operator/common/dataproc/StringParsers.java @@ -0,0 +1,182 @@ +package com.alibaba.alink.operator.common.dataproc; + +import com.alibaba.alink.common.utils.JsonConverter; +import com.jayway.jsonpath.JsonPath; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.types.Row; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; + +import javax.annotation.Nullable; +import java.util.HashMap; +import java.util.Map; + +public class StringParsers { + + interface StringParser { + /** + * Parse a text line to a `Row`. If some of fields are missing or mal-formatted, then null is padded. + * + * @param line The line to parse. + * @return The resulted Row. Tuple.f0 indicates whether we successfully parse all fields, Tuple.f1 is the parsed result. + */ + Tuple2 parse(String line); + } + + /** + * JsonParser parse a json string to a {@link Row}. + */ + public static class JsonParser implements StringParser { + private String[] fieldNames; + private FieldParser[] parsers; + private boolean[] isString; + + public JsonParser(String[] fieldNames, TypeInformation[] fieldTypes) { + this.fieldNames = fieldNames; + Preconditions.checkArgument(fieldNames.length == fieldTypes.length); + this.isString = new boolean[fieldNames.length]; + this.parsers = new FieldParser[fieldNames.length]; + + for (int i = 0; i < fieldTypes.length; i++) { + parsers[i] = getFieldParser(fieldTypes[i].getTypeClass()); + isString[i] = fieldTypes[i].equals(Types.STRING); + } + } + + @Override + public Tuple2 parse(String line) { + Row row = new Row(fieldNames.length); + boolean succ = true; + for (int i = 0; i < fieldNames.length; i++) { + Object o = JsonPath.read(line, "$." + fieldNames[i]); + if (o == null) { + succ = false; + continue; + } + if (!(o instanceof String)) { + o = JsonConverter.toJson(o); + } + Tuple2 parsed = parseField(parsers[i], (String) o, isString[i]); + if (!parsed.f0) { + succ = false; + } + row.setField(i, parsed.f1); + } + return Tuple2.of(succ, row); + } + + + } + + /** + * KvParser parse a key-value string to a {@link Row}. + */ + public static class KvParser implements StringParser { + private String[] fieldNames; + private FieldParser[] parsers; + private boolean[] isString; + private String colDelimiter; + private String valDelimiter; + private transient Map keyToFieldIdx; + + public KvParser(String[] fieldNames, TypeInformation[] fieldTypes, String colDelimiter, String valDelimiter) { + this.fieldNames = fieldNames; + Preconditions.checkArgument(fieldNames.length == fieldTypes.length); + this.isString = new boolean[fieldNames.length]; + this.parsers = new FieldParser[fieldNames.length]; + + for (int i = 0; i < fieldTypes.length; i++) { + parsers[i] = getFieldParser(fieldTypes[i].getTypeClass()); + isString[i] = fieldTypes[i].equals(Types.STRING); + } + this.colDelimiter = colDelimiter; + this.valDelimiter = valDelimiter; + + keyToFieldIdx = new HashMap<>(); + for (int i = 0; i < fieldNames.length; i++) { + keyToFieldIdx.put(fieldNames[i], i); + } + } + + @Override + public Tuple2 parse(String line) { + Row row = new Row(fieldNames.length); + String[] fields = line.split(colDelimiter); + boolean succ = true; + int cnt = 0; + + for (int i = 0; i < fields.length; i++) { + if (StringUtils.isNullOrWhitespaceOnly(fields[i])) { + succ = false; + continue; + } + String[] kv = fields[i].split(valDelimiter); + if (kv.length < 2) { + succ = false; + continue; + } + Integer fidx = keyToFieldIdx.get(kv[0]); + if (fidx == null) { + continue; + } + Tuple2 parsed = parseField(parsers[i], kv[1], isString[i]); + if (!parsed.f0) { + succ = false; + } + row.setField(i, parsed.f1); + cnt++; + } + + if (cnt < fieldNames.length) { + succ = false; + } + return Tuple2.of(succ, row); + } + } + + /** + * CsvParser parse a CSV formatted text line to a {@link Row}. + */ + public static class CsvParser implements StringParser { + com.alibaba.alink.operator.common.io.csv.CsvParser parser; + + public CsvParser(TypeInformation[] types, String fieldDelim, @Nullable Character quoteChar) { + this.parser = new com.alibaba.alink.operator.common.io.csv.CsvParser(types, fieldDelim, quoteChar); + } + + @Override + public Tuple2 parse(String line) { + return parser.parse(line); + } + } + + static FieldParser getFieldParser(Class typeClazz) { + Class> parserType = FieldParser.getParserForType(typeClazz); + if (parserType == null) { + throw new RuntimeException("No parser available for type '" + typeClazz.getName() + "'."); + } + return InstantiationUtil.instantiate(parserType, FieldParser.class); + } + + static Tuple2 parseField(FieldParser parser, String token, boolean isStringField) { + if (isStringField) { + return Tuple2.of(true, token); + } else { + if (StringUtils.isNullOrWhitespaceOnly(token)) { + return Tuple2.of(false, null); + } + byte[] bytes = token.getBytes(); + parser.resetErrorStateAndParse(bytes, 0, bytes.length, new byte[]{0}, null); + FieldParser.ParseErrorState errorState = parser.getErrorState(); + if (errorState != FieldParser.ParseErrorState.NONE) { + return Tuple2.of(false, null); + } else { + return Tuple2.of(true, parser.getLastResult()); + } + } + } +} diff --git a/core/src/main/java/com/alibaba/alink/operator/common/dataproc/StringToColumnsMappers.java b/core/src/main/java/com/alibaba/alink/operator/common/dataproc/StringToColumnsMappers.java new file mode 100644 index 000000000..3b71d53eb --- /dev/null +++ b/core/src/main/java/com/alibaba/alink/operator/common/dataproc/StringToColumnsMappers.java @@ -0,0 +1,110 @@ +package com.alibaba.alink.operator.common.dataproc; + +import com.alibaba.alink.common.mapper.Mapper; +import com.alibaba.alink.common.utils.OutputColsHelper; +import com.alibaba.alink.common.utils.TableUtil; +import com.alibaba.alink.operator.common.io.csv.CsvUtil; +import com.alibaba.alink.params.dataproc.CsvToColumnsParams; +import com.alibaba.alink.params.dataproc.JsonToColumnsParams; +import com.alibaba.alink.params.dataproc.KvToColumnsParams; +import com.alibaba.alink.params.dataproc.StringToColumnsParams; +import com.alibaba.alink.params.dataproc.vector.VectorToColumnsParams; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.ml.api.misc.param.Params; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.types.Row; + +public class StringToColumnsMappers { + + public static abstract class BaseStringToColumnsMapper extends Mapper { + + private OutputColsHelper outputColsHelper; + private int idx; + private String[] fieldNames; + private TypeInformation[] fieldTypes; + private transient StringParsers.StringParser parser; + private transient StringToColumnsParams.HandleInvalid handleInvalid; + private transient boolean hasOpen; + + public BaseStringToColumnsMapper(TableSchema dataSchema, Params params) { + super(dataSchema, params); + String selectedColName = this.params.get(JsonToColumnsParams.SELECTED_COL); + idx = TableUtil.findColIndex(dataSchema.getFieldNames(), selectedColName); + String schemaStr = this.params.get(JsonToColumnsParams.SCHEMA_STR); + TableSchema schema = CsvUtil.schemaStr2Schema(schemaStr); + this.fieldNames = schema.getFieldNames(); + this.fieldTypes = schema.getFieldTypes(); + this.outputColsHelper = new OutputColsHelper(dataSchema, schema.getFieldNames(), schema.getFieldTypes(), + this.params.get(VectorToColumnsParams.RESERVED_COLS)); + } + + public void open() { + this.parser = getParser(fieldNames, fieldTypes, params); + this.handleInvalid = params.get(StringToColumnsParams.HANDLE_INVALID); + } + + @Override + public Row map(Row row) { + if (!hasOpen) { + open(); + hasOpen = true; + } + String text = (String) row.getField(idx); + Tuple2 parsed = parser.parse(text); + + if (!parsed.f0 && handleInvalid == StringToColumnsParams.HandleInvalid.ERROR) { + throw new RuntimeException("Fail to parse \"" + text + "\""); + } + return outputColsHelper.getResultRow(row, parsed.f1); + } + + /** + * Get the output data schema. + */ + @Override + public TableSchema getOutputSchema() { + return outputColsHelper.getResultSchema(); + } + + abstract protected StringParsers.StringParser getParser( + String[] fieldNames, TypeInformation[] fieldTypes, Params params); + } + + public static class CsvToColumnsMapper extends BaseStringToColumnsMapper { + public CsvToColumnsMapper(TableSchema dataSchema, Params params) { + super(dataSchema, params); + } + + @Override + protected StringParsers.StringParser getParser(String[] fieldNames, TypeInformation[] fieldTypes, Params params) { + String fieldDelim = params.get(CsvToColumnsParams.FIELD_DELIMITER); + Character quoteChar = params.get(CsvToColumnsParams.QUOTE_CHAR); + return new StringParsers.CsvParser(fieldTypes, fieldDelim, quoteChar); + } + } + + public static class JsonToColumnsMapper extends BaseStringToColumnsMapper { + public JsonToColumnsMapper(TableSchema dataSchema, Params params) { + super(dataSchema, params); + } + + @Override + protected StringParsers.StringParser getParser(String[] fieldNames, TypeInformation[] fieldTypes, Params params) { + return new StringParsers.JsonParser(fieldNames, fieldTypes); + } + } + + public static class KvToColumnsMapper extends BaseStringToColumnsMapper { + public KvToColumnsMapper(TableSchema dataSchema, Params params) { + super(dataSchema, params); + } + + @Override + protected StringParsers.StringParser getParser(String[] fieldNames, TypeInformation[] fieldTypes, Params params) { + String colDelim = params.get(KvToColumnsParams.COL_DELIMITER); + String valDelim = params.get(KvToColumnsParams.VAL_DELIMITER); + return new StringParsers.KvParser(fieldNames, fieldTypes, colDelim, valDelim); + } + } +} diff --git a/core/src/main/java/com/alibaba/alink/operator/common/io/csv/CsvParser.java b/core/src/main/java/com/alibaba/alink/operator/common/io/csv/CsvParser.java index dd6436ff1..c024a402c 100644 --- a/core/src/main/java/com/alibaba/alink/operator/common/io/csv/CsvParser.java +++ b/core/src/main/java/com/alibaba/alink/operator/common/io/csv/CsvParser.java @@ -2,10 +2,10 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.types.Row; import org.apache.flink.types.parser.FieldParser; import org.apache.flink.util.InstantiationUtil; -import org.apache.flink.util.Preconditions; import org.apache.flink.util.StringUtils; import javax.annotation.Nullable; @@ -19,7 +19,6 @@ public class CsvParser { private Character quoteChar; private String quoteString; private String escapedQuote; - private Row reused; private boolean enableQuote; private FieldParser[] parsers; private boolean[] isString; @@ -36,7 +35,6 @@ public CsvParser(TypeInformation[] types, String fieldDelim, @Nullable Character this.fieldDelim = fieldDelim; this.lenFieldDelim = this.fieldDelim.length(); this.quoteChar = quoteChar; - this.reused = new Row(types.length); this.enableQuote = quoteChar != null; this.parsers = new FieldParser[types.length]; this.isString = new boolean[types.length]; @@ -63,17 +61,20 @@ public CsvParser(TypeInformation[] types, String fieldDelim, @Nullable Character * @param line The text line to parse. * @return The parsed result. */ - public Row parse(String line) { - for (int i = 0; i < reused.getArity(); i++) { - reused.setField(i, null); + public Tuple2 parse(String line) { + Row output = new Row(this.parsers.length); + for (int i = 0; i < output.getArity(); i++) { + output.setField(i, null); } if (line == null || line.isEmpty()) { - return reused; + return Tuple2.of(false, output); } int startPos = 0; + boolean succ = true; final int limit = line.length(); - for (int i = 0; i < reused.getArity(); i++) { - if (startPos >= limit) { + for (int i = 0; i < output.getArity(); i++) { + if (startPos > limit) { + succ = false; break; } boolean isStringCol = isString[i]; @@ -83,14 +84,21 @@ public Row parse(String line) { } String token = line.substring(startPos, delimPos); if (!token.isEmpty()) { - reused.setField(i, parseField(parsers[i], token, isStringCol)); + Tuple2 parsed = parseField(parsers[i], token, isStringCol); + if (!parsed.f0) { + succ = false; + } + output.setField(i, parsed.f1); } startPos = delimPos + this.lenFieldDelim; } - return reused; + return Tuple2.of(succ, output); } private int findNextDelimPos(String line, int startPos, int limit, boolean isStringCol) { + if (startPos >= limit) { + return -1; + } if (!enableQuote || !isStringCol) { return line.indexOf(fieldDelim, startPos); } @@ -116,31 +124,35 @@ private int findNextDelimPos(String line, int startPos, int limit, boolean isStr pos++; } if (pos >= limit) { - throw new RuntimeException("Unterminated quote."); + return -1; } return line.indexOf(fieldDelim, pos + 1); } - private Object parseField(FieldParser parser, String token, boolean isStringField) { + private Tuple2 parseField(FieldParser parser, String token, boolean isStringField) { if (isStringField) { if (!enableQuote || token.charAt(0) != quoteChar) { - return token; + return Tuple2.of(true, token); + } + String content; + if (token.endsWith(quoteChar.toString())) { + content = token.substring(1, token.length() - 1); + } else { + content = token.substring(1, token.length()); } - Preconditions.checkArgument(token.endsWith(quoteChar.toString()), - "String not end with quote: " + String.format("\"%s\"", token)); - String content = token.substring(1, token.length() - 1); - return content.replace(escapedQuote, quoteString); + return Tuple2.of(true, content.replace(escapedQuote, quoteString)); } else { if (StringUtils.isNullOrWhitespaceOnly(token)) { - return null; + return Tuple2.of(true, null); } byte[] bytes = token.getBytes(); parser.resetErrorStateAndParse(bytes, 0, bytes.length, fieldDelim.getBytes(), null); FieldParser.ParseErrorState errorState = parser.getErrorState(); if (errorState != FieldParser.ParseErrorState.NONE) { - throw new RuntimeException("Fail to parse token: " + String.format("\"%s\"", token)); + return Tuple2.of(false, null); + } else { + return Tuple2.of(true, parser.getLastResult()); } - return parser.getLastResult(); } } } diff --git a/core/src/main/java/com/alibaba/alink/operator/common/io/csv/CsvUtil.java b/core/src/main/java/com/alibaba/alink/operator/common/io/csv/CsvUtil.java index dfd6a9496..7b8cb7a51 100644 --- a/core/src/main/java/com/alibaba/alink/operator/common/io/csv/CsvUtil.java +++ b/core/src/main/java/com/alibaba/alink/operator/common/io/csv/CsvUtil.java @@ -1,10 +1,10 @@ package com.alibaba.alink.operator.common.io.csv; import com.alibaba.alink.operator.common.io.types.FlinkTypeConverter; - import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.TableSchema; import org.apache.flink.types.Row; @@ -99,7 +99,11 @@ public void flatMap(Row value, Collector out) throws Exception { out.collect(emptyRow); } } else { - out.collect(parser.parse(line)); + Tuple2 parsed = parser.parse(line); + if (!parsed.f0) { + throw new RuntimeException("Fail to parse line \"" + line + "\""); + } + out.collect(parsed.f1); } } } @@ -149,126 +153,4 @@ public static String[] getColNames(String schemaStr) { public static TypeInformation[] getColTypes(String schemaStr) { return schemaStr2Schema(schemaStr).getFieldTypes(); } - - /** - * Parse the escape chars and unicode chars from a string, replace them with the chars they represents. - *

- * For example: - *

    - *
  • "\\t" -> '\t' - *
  • "\\001" -> '\001' - *
  • "\\u0001" -> '\u0001' - *
- *

- * The escaped char list: \b, \f, \n, \r, \t, \\, \', \". - */ - public static String unEscape(String s) { - if (s == null) { - return null; - } - - if (s.length() == 0) { - return s; - } - - StringBuilder sbd = new StringBuilder(); - - for (int i = 0; i < s.length(); ) { - int flag = extractEscape(s, i, sbd); - if (flag <= 0) { - sbd.append(s.charAt(i)); - i++; - } else { - i += flag; - } - } - - return sbd.toString(); - } - - /** - * Parse one escape char. - * - * @param s The string to parse. - * @param pos Starting position of the string. - * @param sbd String builder to accept the parsed result. - * @return The length of the part of the string that is parsed. - */ - private static int extractEscape(String s, int pos, StringBuilder sbd) { - if (s.charAt(pos) != '\\') { - return 0; - } - pos++; - if (pos >= s.length()) { - return 0; - } - char c = s.charAt(pos); - - if (c >= '0' && c <= '7') { - int digit = 1; - int i; - for (i = 0; i < 2; i++) { - if (pos + 1 + i >= s.length()) { - break; - } - if (s.charAt(pos + 1 + i) >= '0' && s.charAt(pos + 1 + i) <= '7') { - digit++; - } else { - break; - } - } - int n = Integer.valueOf(s.substring(pos, pos + digit), 8); - sbd.append(Character.toChars(n)); - return digit + 1; - } else if (c == 'u') { // unicode - pos++; - int digit = 0; - for (int i = 0; i < 4; i++) { - if (pos + i >= s.length()) { - break; - } - char ch = s.charAt(pos + i); - if ((ch >= '0' && ch <= '9') || ((ch >= 'a' && ch <= 'f')) || ((ch >= 'A' && ch <= 'F'))) { - digit++; - } else { - break; - } - } - if (digit == 0) { - return 0; - } - int n = Integer.valueOf(s.substring(pos, pos + digit), 16); - sbd.append(Character.toChars(n)); - return digit + 2; - } else { - switch (c) { - case '\\': - sbd.append('\\'); - return 2; - case '\'': - sbd.append('\''); - return 2; - case '\"': - sbd.append('"'); - return 2; - case 'r': - sbd.append('\r'); - return 2; - case 'f': - sbd.append('\f'); - return 2; - case 't': - sbd.append('\t'); - return 2; - case 'n': - sbd.append('\n'); - return 2; - case 'b': - sbd.append('\b'); - return 2; - default: - return 0; - } - } - } } diff --git a/core/src/main/java/com/alibaba/alink/operator/stream/dataproc/CsvToColumnsStreamOp.java b/core/src/main/java/com/alibaba/alink/operator/stream/dataproc/CsvToColumnsStreamOp.java new file mode 100644 index 000000000..dbaf8eec0 --- /dev/null +++ b/core/src/main/java/com/alibaba/alink/operator/stream/dataproc/CsvToColumnsStreamOp.java @@ -0,0 +1,23 @@ +package com.alibaba.alink.operator.stream.dataproc; + +import com.alibaba.alink.operator.common.dataproc.StringToColumnsMappers; +import com.alibaba.alink.operator.stream.utils.MapStreamOp; +import com.alibaba.alink.params.dataproc.CsvToColumnsParams; +import org.apache.flink.ml.api.misc.param.Params; + +/** + * CsvToColumnsStreamOp parses a CSV formatted string column to several columns, + * according to the specified schema. + */ +public final class CsvToColumnsStreamOp extends MapStreamOp + implements CsvToColumnsParams { + + public CsvToColumnsStreamOp() { + this(null); + } + + public CsvToColumnsStreamOp(Params params) { + super(StringToColumnsMappers.CsvToColumnsMapper::new, params); + } + +} diff --git a/core/src/main/java/com/alibaba/alink/operator/stream/dataproc/JsonToColumnsStreamOp.java b/core/src/main/java/com/alibaba/alink/operator/stream/dataproc/JsonToColumnsStreamOp.java new file mode 100644 index 000000000..37b98654b --- /dev/null +++ b/core/src/main/java/com/alibaba/alink/operator/stream/dataproc/JsonToColumnsStreamOp.java @@ -0,0 +1,23 @@ +package com.alibaba.alink.operator.stream.dataproc; + +import com.alibaba.alink.operator.common.dataproc.StringToColumnsMappers; +import com.alibaba.alink.operator.stream.utils.MapStreamOp; +import com.alibaba.alink.params.dataproc.JsonToColumnsParams; +import org.apache.flink.ml.api.misc.param.Params; + +/** + * JsonToColumnsStreamOp parses a json string column to several columns, + * according to the specified schema. + */ +public final class JsonToColumnsStreamOp extends MapStreamOp + implements JsonToColumnsParams { + + public JsonToColumnsStreamOp() { + this(null); + } + + public JsonToColumnsStreamOp(Params params) { + super(StringToColumnsMappers.JsonToColumnsMapper::new, params); + } + +} diff --git a/core/src/main/java/com/alibaba/alink/operator/stream/dataproc/KvToColumnsStreamOp.java b/core/src/main/java/com/alibaba/alink/operator/stream/dataproc/KvToColumnsStreamOp.java new file mode 100644 index 000000000..10824d5bb --- /dev/null +++ b/core/src/main/java/com/alibaba/alink/operator/stream/dataproc/KvToColumnsStreamOp.java @@ -0,0 +1,25 @@ +package com.alibaba.alink.operator.stream.dataproc; + +import com.alibaba.alink.operator.common.dataproc.StringToColumnsMappers; +import com.alibaba.alink.operator.stream.utils.MapStreamOp; +import com.alibaba.alink.params.dataproc.KvToColumnsParams; +import org.apache.flink.ml.api.misc.param.Params; + +/** + * KvToColumnsStreamOp parses a key-value string column to several columns, + * according to the specified schema. + *

+ * The key-value string has format like: f1=val1,f2=val2,f3=val3 + */ +public final class KvToColumnsStreamOp extends MapStreamOp + implements KvToColumnsParams { + + public KvToColumnsStreamOp() { + this(null); + } + + public KvToColumnsStreamOp(Params params) { + super(StringToColumnsMappers.KvToColumnsMapper::new, params); + } + +} diff --git a/core/src/main/java/com/alibaba/alink/params/dataproc/CsvToColumnsParams.java b/core/src/main/java/com/alibaba/alink/params/dataproc/CsvToColumnsParams.java new file mode 100644 index 000000000..11d485418 --- /dev/null +++ b/core/src/main/java/com/alibaba/alink/params/dataproc/CsvToColumnsParams.java @@ -0,0 +1,8 @@ +package com.alibaba.alink.params.dataproc; + +import com.alibaba.alink.params.io.HasFieldDelimiterDvComma; +import com.alibaba.alink.params.io.HasQuoteCharDefaultAsDoubleQuote; + +public interface CsvToColumnsParams extends + StringToColumnsParams, HasFieldDelimiterDvComma, HasQuoteCharDefaultAsDoubleQuote { +} \ No newline at end of file diff --git a/core/src/main/java/com/alibaba/alink/params/dataproc/JsonToColumnsParams.java b/core/src/main/java/com/alibaba/alink/params/dataproc/JsonToColumnsParams.java new file mode 100644 index 000000000..75353192c --- /dev/null +++ b/core/src/main/java/com/alibaba/alink/params/dataproc/JsonToColumnsParams.java @@ -0,0 +1,5 @@ +package com.alibaba.alink.params.dataproc; + +public interface JsonToColumnsParams extends + StringToColumnsParams { +} \ No newline at end of file diff --git a/core/src/main/java/com/alibaba/alink/params/dataproc/KvToColumnsParams.java b/core/src/main/java/com/alibaba/alink/params/dataproc/KvToColumnsParams.java new file mode 100644 index 000000000..1d5e3c11d --- /dev/null +++ b/core/src/main/java/com/alibaba/alink/params/dataproc/KvToColumnsParams.java @@ -0,0 +1,8 @@ +package com.alibaba.alink.params.dataproc; + +import com.alibaba.alink.params.shared.delimiter.HasColDelimiterDvComma; +import com.alibaba.alink.params.shared.delimiter.HasValDelimiterDvColon; + +public interface KvToColumnsParams extends + StringToColumnsParams, HasColDelimiterDvComma, HasValDelimiterDvColon { +} \ No newline at end of file diff --git a/core/src/main/java/com/alibaba/alink/params/dataproc/StringToColumnsParams.java b/core/src/main/java/com/alibaba/alink/params/dataproc/StringToColumnsParams.java new file mode 100644 index 000000000..9220836b1 --- /dev/null +++ b/core/src/main/java/com/alibaba/alink/params/dataproc/StringToColumnsParams.java @@ -0,0 +1,48 @@ +package com.alibaba.alink.params.dataproc; + +import com.alibaba.alink.params.ParamUtil; +import com.alibaba.alink.params.io.HasSchemaStr; +import com.alibaba.alink.params.shared.colname.HasReservedCols; +import com.alibaba.alink.params.shared.colname.HasSelectedCol; +import org.apache.flink.ml.api.misc.param.ParamInfo; +import org.apache.flink.ml.api.misc.param.ParamInfoFactory; + +import java.io.Serializable; + +public interface StringToColumnsParams extends + HasSelectedCol, + HasSchemaStr, + HasReservedCols { + + ParamInfo HANDLE_INVALID = ParamInfoFactory + .createParamInfo("handleInvalid", HandleInvalid.class) + .setDescription("handle invalid strategy") + .setHasDefaultValue(HandleInvalid.SKIP) + .build(); + + default HandleInvalid getHandleInvalid() { + return get(HANDLE_INVALID); + } + + default T setHandleInvalid(HandleInvalid value) { + return set(HANDLE_INVALID, value); + } + + default T setHandleInvalid(String value) { + return set(HANDLE_INVALID, ParamUtil.searchEnum(HANDLE_INVALID, value)); + } + + /** + * Strategy to handle parse exception. + */ + enum HandleInvalid implements Serializable { + /** + * Raise exception. + */ + ERROR, + /** + * Pad with null. + */ + SKIP + } +} \ No newline at end of file diff --git a/core/src/main/java/com/alibaba/alink/pipeline/ModelExporterUtils.java b/core/src/main/java/com/alibaba/alink/pipeline/ModelExporterUtils.java index 02794a46b..09b33f045 100644 --- a/core/src/main/java/com/alibaba/alink/pipeline/ModelExporterUtils.java +++ b/core/src/main/java/com/alibaba/alink/pipeline/ModelExporterUtils.java @@ -168,7 +168,7 @@ public void open(Configuration parameters) throws Exception { @Override public Row map(Row value) throws Exception { - return parser.parse((String) value.getField(1)); + return parser.parse((String) value.getField(1)).f1; } }); diff --git a/core/src/main/java/com/alibaba/alink/pipeline/dataproc/CsvToColumns.java b/core/src/main/java/com/alibaba/alink/pipeline/dataproc/CsvToColumns.java new file mode 100644 index 000000000..b61a81e14 --- /dev/null +++ b/core/src/main/java/com/alibaba/alink/pipeline/dataproc/CsvToColumns.java @@ -0,0 +1,23 @@ +package com.alibaba.alink.pipeline.dataproc; + +import com.alibaba.alink.operator.common.dataproc.StringToColumnsMappers; +import com.alibaba.alink.params.dataproc.CsvToColumnsParams; +import com.alibaba.alink.pipeline.MapTransformer; +import org.apache.flink.ml.api.misc.param.Params; + +/** + * CsvToColumns parses a CSV formatted string column to several columns, + * according to the specified schema. + */ +public class CsvToColumns extends MapTransformer + implements CsvToColumnsParams { + + public CsvToColumns() { + this(null); + } + + public CsvToColumns(Params params) { + super(StringToColumnsMappers.CsvToColumnsMapper::new, params); + } + +} diff --git a/core/src/main/java/com/alibaba/alink/pipeline/dataproc/JsonToColumns.java b/core/src/main/java/com/alibaba/alink/pipeline/dataproc/JsonToColumns.java new file mode 100644 index 000000000..7f373913f --- /dev/null +++ b/core/src/main/java/com/alibaba/alink/pipeline/dataproc/JsonToColumns.java @@ -0,0 +1,23 @@ +package com.alibaba.alink.pipeline.dataproc; + +import com.alibaba.alink.operator.common.dataproc.StringToColumnsMappers; +import com.alibaba.alink.params.dataproc.JsonToColumnsParams; +import com.alibaba.alink.pipeline.MapTransformer; +import org.apache.flink.ml.api.misc.param.Params; + +/** + * JsonToColumns parses a json string column to several columns, + * according to the specified schema. + */ +public class JsonToColumns extends MapTransformer + implements JsonToColumnsParams { + + public JsonToColumns() { + this(null); + } + + public JsonToColumns(Params params) { + super(StringToColumnsMappers.JsonToColumnsMapper::new, params); + } + +} diff --git a/core/src/main/java/com/alibaba/alink/pipeline/dataproc/KvToColumns.java b/core/src/main/java/com/alibaba/alink/pipeline/dataproc/KvToColumns.java new file mode 100644 index 000000000..670b03ea2 --- /dev/null +++ b/core/src/main/java/com/alibaba/alink/pipeline/dataproc/KvToColumns.java @@ -0,0 +1,25 @@ +package com.alibaba.alink.pipeline.dataproc; + +import com.alibaba.alink.operator.common.dataproc.StringToColumnsMappers; +import com.alibaba.alink.params.dataproc.KvToColumnsParams; +import com.alibaba.alink.pipeline.MapTransformer; +import org.apache.flink.ml.api.misc.param.Params; + +/** + * KvToColumns parses a key-value string column to several columns, + * according to the specified schema. + *

+ * The key-value string has format like: f1=val1,f2=val2,f3=val3 + */ +public class KvToColumns extends MapTransformer + implements KvToColumnsParams { + + public KvToColumns() { + this(null); + } + + public KvToColumns(Params params) { + super(StringToColumnsMappers.KvToColumnsMapper::new, params); + } + +} diff --git a/core/src/test/java/com/alibaba/alink/operator/common/dataproc/StringParsersTest.java b/core/src/test/java/com/alibaba/alink/operator/common/dataproc/StringParsersTest.java new file mode 100644 index 000000000..985617ebd --- /dev/null +++ b/core/src/test/java/com/alibaba/alink/operator/common/dataproc/StringParsersTest.java @@ -0,0 +1,60 @@ +package com.alibaba.alink.operator.common.dataproc; + +import com.alibaba.alink.operator.common.io.csv.CsvUtil; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.types.Row; +import org.junit.Assert; +import org.junit.Test; + +public class StringParsersTest { + @Test + public void testJsonParser() throws Exception { + String jsonStr = "{\n" + + " \"media_name\": \"Titanic\",\n" + + " \"title\": \"Titanic\",\n" + + " \"compare_point\": 0.0001,\n" + + " \"spider_point\": 0.0000,\n" + + " \"search_point\": 0.6,\n" + + " \"collection_id\": 123456,\n" + + " \"media_id\": 3214\n" + + "}"; + + String schemaStr + = "media_name string, title string, compare_point double, spider_point double, search_point double, " + + "collection_id bigint, media_id bigint"; + + TableSchema schema = CsvUtil.schemaStr2Schema(schemaStr); + StringParsers.JsonParser parser = new StringParsers.JsonParser(schema.getFieldNames(), schema.getFieldTypes()); + Tuple2 parsed = parser.parse(jsonStr); + Assert.assertTrue(parsed.f0); + Assert.assertEquals(parsed.f1.getArity(), 7); + } + + @Test + public void testKvParser() throws Exception { + String kvStr = "f1=1,f2=2.0,f3=false,f4=val,f5=2018-09-10,f6=14:22:20,f7=2018-09-10 14:22:20"; + String schemaStr = "f1 bigint, f2 double, f3 boolean, f4 string, f5 date, f6 time, f7 timestamp"; + + TableSchema schema = CsvUtil.schemaStr2Schema(schemaStr); + StringParsers.KvParser parser = new StringParsers.KvParser(schema.getFieldNames(), schema.getFieldTypes(), ",", "="); + Tuple2 parsed = parser.parse(kvStr); + Assert.assertTrue(parsed.f0); + Assert.assertEquals(parsed.f1.getArity(), 7); + } + + @Test + public void testCsvParser() throws Exception { + StringParsers.CsvParser parser = new StringParsers.CsvParser( + new TypeInformation[]{Types.STRING, Types.STRING, Types.STRING}, "____", '"'); + Assert.assertEquals(parser.parse("hello_____world____").f1.getField(0), "hello"); + Assert.assertEquals(parser.parse("hello_____world____").f1.getField(1), "_world"); + Assert.assertEquals(parser.parse("hello_____world____").f1.getField(2), null); + Assert.assertEquals(parser.parse("\"hello_____world____\"").f1.getField(0), "hello_____world____"); + Assert.assertEquals(parser.parse("\"hello_____world____\"").f1.getField(1), null); + Assert.assertEquals(parser.parse("\"hello_____world____\"").f1.getField(2), null); + } + +} \ No newline at end of file diff --git a/core/src/test/java/com/alibaba/alink/operator/common/dataproc/StringToColumnsMappersTest.java b/core/src/test/java/com/alibaba/alink/operator/common/dataproc/StringToColumnsMappersTest.java new file mode 100644 index 000000000..9b2901dfb --- /dev/null +++ b/core/src/test/java/com/alibaba/alink/operator/common/dataproc/StringToColumnsMappersTest.java @@ -0,0 +1,33 @@ +package com.alibaba.alink.operator.common.dataproc; + +import com.alibaba.alink.operator.batch.BatchOperator; +import com.alibaba.alink.operator.batch.dataproc.CsvToColumnsBatchOp; +import com.alibaba.alink.operator.batch.source.MemSourceBatchOp; +import org.apache.flink.types.Row; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; + +public class StringToColumnsMappersTest { + @Test + public void testCsvToColumns() throws Exception { + Row[] rows = new Row[]{ + Row.of("pk1", "a,1.0"), + Row.of("pk2", "e,2.0"), + Row.of("pk3", ""), + Row.of("pk4", "b"), + }; + + BatchOperator data = new MemSourceBatchOp(Arrays.asList(rows), new String[]{"id", "content"}); + + BatchOperator op = new CsvToColumnsBatchOp() + .setSelectedCol("content") + .setHandleInvalid("skip") + .setSchemaStr("f1 string, f2 double"); + + BatchOperator output = data.link(op); + Assert.assertEquals(output.getColNames().length, 4); + Assert.assertEquals(output.collect().size(), 4); + } +} diff --git a/core/src/test/java/com/alibaba/alink/operator/common/io/csv/CsvFormatterTest.java b/core/src/test/java/com/alibaba/alink/operator/common/io/csv/CsvFormatterTest.java index 3196bfe99..2e0baa215 100644 --- a/core/src/test/java/com/alibaba/alink/operator/common/io/csv/CsvFormatterTest.java +++ b/core/src/test/java/com/alibaba/alink/operator/common/io/csv/CsvFormatterTest.java @@ -18,7 +18,7 @@ public void testFormatter() throws Exception { CsvFormatter formatter = new CsvFormatter(types, ",", '"'); CsvParser parser = new CsvParser(types, ",", '"'); String text = formatter.format(row); - Row parsed = parser.parse(text); + Row parsed = parser.parse(text).f1; Assert.assertEquals(parsed.getArity(), row.getArity()); for (int i = 0; i < parsed.getArity(); i++) { @@ -37,7 +37,7 @@ public void testDoublePrecision() throws Exception { new Random().nextDouble()}; for (Double v : values) { String text = formatter.format(Row.of(v)); - Row parsed = parser.parse(text); + Row parsed = parser.parse(text).f1; Double p = (Double) parsed.getField(0); Assert.assertEquals(v, p, 0.); } diff --git a/core/src/test/java/com/alibaba/alink/operator/common/io/csv/CsvParserTest.java b/core/src/test/java/com/alibaba/alink/operator/common/io/csv/CsvParserTest.java index d63f1aebc..cc7c77f79 100644 --- a/core/src/test/java/com/alibaba/alink/operator/common/io/csv/CsvParserTest.java +++ b/core/src/test/java/com/alibaba/alink/operator/common/io/csv/CsvParserTest.java @@ -3,45 +3,39 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.junit.Assert; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.ExpectedException; public class CsvParserTest { - @Rule - public ExpectedException thrown = ExpectedException.none(); @Test public void testParser() throws Exception { CsvParser parser = new CsvParser(new TypeInformation[]{Types.STRING}, ",", '"'); - Assert.assertEquals(parser.parse("\"hello, world\"").getField(0), "hello, world"); - Assert.assertEquals(parser.parse("").getField(0), null); - Assert.assertEquals(parser.parse("\"\"").getField(0), ""); - Assert.assertEquals(parser.parse("\"\"\"\"\"\"").getField(0), "\"\""); + Assert.assertEquals(parser.parse("\"hello, world\"").f1.getField(0), "hello, world"); + Assert.assertEquals(parser.parse("").f1.getField(0), null); + Assert.assertEquals(parser.parse("\"\"").f1.getField(0), ""); + Assert.assertEquals(parser.parse("\"\"\"\"\"\"").f1.getField(0), "\"\""); } @Test public void testLongFieldSeparator() throws Exception { CsvParser parser = new CsvParser(new TypeInformation[]{Types.STRING, Types.STRING, Types.STRING}, "____", '"'); - Assert.assertEquals(parser.parse("hello_____world____").getField(0), "hello"); - Assert.assertEquals(parser.parse("hello_____world____").getField(1), "_world"); - Assert.assertEquals(parser.parse("hello_____world____").getField(2), null); - Assert.assertEquals(parser.parse("\"hello_____world____\"").getField(0), "hello_____world____"); - Assert.assertEquals(parser.parse("\"hello_____world____\"").getField(1), null); - Assert.assertEquals(parser.parse("\"hello_____world____\"").getField(2), null); + Assert.assertEquals(parser.parse("hello_____world____").f1.getField(0), "hello"); + Assert.assertEquals(parser.parse("hello_____world____").f1.getField(1), "_world"); + Assert.assertEquals(parser.parse("hello_____world____").f1.getField(2), null); + Assert.assertEquals(parser.parse("\"hello_____world____\"").f1.getField(0), "hello_____world____"); + Assert.assertEquals(parser.parse("\"hello_____world____\"").f1.getField(1), null); + Assert.assertEquals(parser.parse("\"hello_____world____\"").f1.getField(2), null); } @Test public void testMalFormatString1() throws Exception { - CsvParser parser = new CsvParser(new TypeInformation[]{Types.STRING}, ",", '"'); - thrown.expect(RuntimeException.class); - parser.parse("\"hello\" world"); // should end with quote + CsvParser parser = new CsvParser(new TypeInformation[]{Types.STRING, Types.LONG}, ",", '"'); + Assert.assertTrue(parser.parse("\"hello\" world,1").f0); } @Test public void testMalFormatString2() throws Exception { - CsvParser parser = new CsvParser(new TypeInformation[]{Types.STRING}, ",", '"'); - thrown.expect(RuntimeException.class); - parser.parse("\"hello world"); // unterminated quote + CsvParser parser = new CsvParser(new TypeInformation[]{Types.STRING, Types.LONG}, ",", '"'); + Assert.assertFalse(parser.parse("\"hello world,1").f0); } } \ No newline at end of file