Skip to content

Commit

Permalink
Add operators to parse strings in CSV, JSON and KV formats to columns…
Browse files Browse the repository at this point in the history
…. See #70
  • Loading branch information
chengscu authored and shaomeng.wang committed Apr 9, 2020
1 parent 2295531 commit 5a090a3
Show file tree
Hide file tree
Showing 22 changed files with 720 additions and 168 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.alibaba.alink.operator.batch.dataproc;

import com.alibaba.alink.operator.batch.utils.MapBatchOp;
import com.alibaba.alink.operator.common.dataproc.StringToColumnsMappers;
import com.alibaba.alink.params.dataproc.CsvToColumnsParams;
import org.apache.flink.ml.api.misc.param.Params;

/**
* CsvToColumnsBatchOp parses a CSV formatted string column to several columns,
* according to the specified schema.
*/
public final class CsvToColumnsBatchOp extends MapBatchOp<CsvToColumnsBatchOp>
implements CsvToColumnsParams<CsvToColumnsBatchOp> {

public CsvToColumnsBatchOp() {
this(null);
}

public CsvToColumnsBatchOp(Params params) {
super(StringToColumnsMappers.CsvToColumnsMapper::new, params);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.alibaba.alink.operator.batch.dataproc;

import com.alibaba.alink.operator.batch.utils.MapBatchOp;
import com.alibaba.alink.operator.common.dataproc.StringToColumnsMappers;
import com.alibaba.alink.params.dataproc.JsonToColumnsParams;
import org.apache.flink.ml.api.misc.param.Params;

/**
* JsonToColumnsBatchOp parses a json string column to several columns,
* according to the specified schema.
*/
public final class JsonToColumnsBatchOp extends MapBatchOp<JsonToColumnsBatchOp>
implements JsonToColumnsParams<JsonToColumnsBatchOp> {

public JsonToColumnsBatchOp() {
this(null);
}

public JsonToColumnsBatchOp(Params params) {
super(StringToColumnsMappers.JsonToColumnsMapper::new, params);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.alibaba.alink.operator.batch.dataproc;

import com.alibaba.alink.operator.batch.utils.MapBatchOp;
import com.alibaba.alink.operator.common.dataproc.StringToColumnsMappers;
import com.alibaba.alink.params.dataproc.KvToColumnsParams;
import org.apache.flink.ml.api.misc.param.Params;

/**
* KvToColumnsBatchOp parses a key-value string column to several columns,
* according to the specified schema.
* <p>
* The key-value string has format like: f1=val1,f2=val2,f3=val3
*/
public final class KvToColumnsBatchOp extends MapBatchOp<KvToColumnsBatchOp>
implements KvToColumnsParams<KvToColumnsBatchOp> {

public KvToColumnsBatchOp() {
this(null);
}

public KvToColumnsBatchOp(Params params) {
super(StringToColumnsMappers.KvToColumnsMapper::new, params);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
package com.alibaba.alink.operator.common.dataproc;

import com.alibaba.alink.common.utils.JsonConverter;
import com.jayway.jsonpath.JsonPath;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.types.Row;
import org.apache.flink.types.parser.FieldParser;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;

import javax.annotation.Nullable;
import java.util.HashMap;
import java.util.Map;

public class StringParsers {

interface StringParser {
/**
* Parse a text line to a `Row`. If some of fields are missing or mal-formatted, then null is padded.
*
* @param line The line to parse.
* @return The resulted Row. Tuple.f0 indicates whether we successfully parse all fields, Tuple.f1 is the parsed result.
*/
Tuple2<Boolean, Row> parse(String line);
}

/**
* JsonParser parse a json string to a {@link Row}.
*/
public static class JsonParser implements StringParser {
private String[] fieldNames;
private FieldParser<?>[] parsers;
private boolean[] isString;

public JsonParser(String[] fieldNames, TypeInformation[] fieldTypes) {
this.fieldNames = fieldNames;
Preconditions.checkArgument(fieldNames.length == fieldTypes.length);
this.isString = new boolean[fieldNames.length];
this.parsers = new FieldParser[fieldNames.length];

for (int i = 0; i < fieldTypes.length; i++) {
parsers[i] = getFieldParser(fieldTypes[i].getTypeClass());
isString[i] = fieldTypes[i].equals(Types.STRING);
}
}

@Override
public Tuple2<Boolean, Row> parse(String line) {
Row row = new Row(fieldNames.length);
boolean succ = true;
for (int i = 0; i < fieldNames.length; i++) {
Object o = JsonPath.read(line, "$." + fieldNames[i]);
if (o == null) {
succ = false;
continue;
}
if (!(o instanceof String)) {
o = JsonConverter.toJson(o);
}
Tuple2<Boolean, Object> parsed = parseField(parsers[i], (String) o, isString[i]);
if (!parsed.f0) {
succ = false;
}
row.setField(i, parsed.f1);
}
return Tuple2.of(succ, row);
}


}

/**
* KvParser parse a key-value string to a {@link Row}.
*/
public static class KvParser implements StringParser {
private String[] fieldNames;
private FieldParser<?>[] parsers;
private boolean[] isString;
private String colDelimiter;
private String valDelimiter;
private transient Map<String, Integer> keyToFieldIdx;

public KvParser(String[] fieldNames, TypeInformation[] fieldTypes, String colDelimiter, String valDelimiter) {
this.fieldNames = fieldNames;
Preconditions.checkArgument(fieldNames.length == fieldTypes.length);
this.isString = new boolean[fieldNames.length];
this.parsers = new FieldParser[fieldNames.length];

for (int i = 0; i < fieldTypes.length; i++) {
parsers[i] = getFieldParser(fieldTypes[i].getTypeClass());
isString[i] = fieldTypes[i].equals(Types.STRING);
}
this.colDelimiter = colDelimiter;
this.valDelimiter = valDelimiter;

keyToFieldIdx = new HashMap<>();
for (int i = 0; i < fieldNames.length; i++) {
keyToFieldIdx.put(fieldNames[i], i);
}
}

@Override
public Tuple2<Boolean, Row> parse(String line) {
Row row = new Row(fieldNames.length);
String[] fields = line.split(colDelimiter);
boolean succ = true;
int cnt = 0;

for (int i = 0; i < fields.length; i++) {
if (StringUtils.isNullOrWhitespaceOnly(fields[i])) {
succ = false;
continue;
}
String[] kv = fields[i].split(valDelimiter);
if (kv.length < 2) {
succ = false;
continue;
}
Integer fidx = keyToFieldIdx.get(kv[0]);
if (fidx == null) {
continue;
}
Tuple2<Boolean, Object> parsed = parseField(parsers[i], kv[1], isString[i]);
if (!parsed.f0) {
succ = false;
}
row.setField(i, parsed.f1);
cnt++;
}

if (cnt < fieldNames.length) {
succ = false;
}
return Tuple2.of(succ, row);
}
}

/**
* CsvParser parse a CSV formatted text line to a {@link Row}.
*/
public static class CsvParser implements StringParser {
com.alibaba.alink.operator.common.io.csv.CsvParser parser;

public CsvParser(TypeInformation[] types, String fieldDelim, @Nullable Character quoteChar) {
this.parser = new com.alibaba.alink.operator.common.io.csv.CsvParser(types, fieldDelim, quoteChar);
}

@Override
public Tuple2<Boolean, Row> parse(String line) {
return parser.parse(line);
}
}

static FieldParser<?> getFieldParser(Class typeClazz) {
Class<? extends FieldParser<?>> parserType = FieldParser.getParserForType(typeClazz);
if (parserType == null) {
throw new RuntimeException("No parser available for type '" + typeClazz.getName() + "'.");
}
return InstantiationUtil.instantiate(parserType, FieldParser.class);
}

static Tuple2<Boolean, Object> parseField(FieldParser<?> parser, String token, boolean isStringField) {
if (isStringField) {
return Tuple2.of(true, token);
} else {
if (StringUtils.isNullOrWhitespaceOnly(token)) {
return Tuple2.of(false, null);
}
byte[] bytes = token.getBytes();
parser.resetErrorStateAndParse(bytes, 0, bytes.length, new byte[]{0}, null);
FieldParser.ParseErrorState errorState = parser.getErrorState();
if (errorState != FieldParser.ParseErrorState.NONE) {
return Tuple2.of(false, null);
} else {
return Tuple2.of(true, parser.getLastResult());
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package com.alibaba.alink.operator.common.dataproc;

import com.alibaba.alink.common.mapper.Mapper;
import com.alibaba.alink.common.utils.OutputColsHelper;
import com.alibaba.alink.common.utils.TableUtil;
import com.alibaba.alink.operator.common.io.csv.CsvUtil;
import com.alibaba.alink.params.dataproc.CsvToColumnsParams;
import com.alibaba.alink.params.dataproc.JsonToColumnsParams;
import com.alibaba.alink.params.dataproc.KvToColumnsParams;
import com.alibaba.alink.params.dataproc.StringToColumnsParams;
import com.alibaba.alink.params.dataproc.vector.VectorToColumnsParams;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.ml.api.misc.param.Params;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.types.Row;

public class StringToColumnsMappers {

public static abstract class BaseStringToColumnsMapper extends Mapper {

private OutputColsHelper outputColsHelper;
private int idx;
private String[] fieldNames;
private TypeInformation[] fieldTypes;
private transient StringParsers.StringParser parser;
private transient StringToColumnsParams.HandleInvalid handleInvalid;
private transient boolean hasOpen;

public BaseStringToColumnsMapper(TableSchema dataSchema, Params params) {
super(dataSchema, params);
String selectedColName = this.params.get(JsonToColumnsParams.SELECTED_COL);
idx = TableUtil.findColIndex(dataSchema.getFieldNames(), selectedColName);
String schemaStr = this.params.get(JsonToColumnsParams.SCHEMA_STR);
TableSchema schema = CsvUtil.schemaStr2Schema(schemaStr);
this.fieldNames = schema.getFieldNames();
this.fieldTypes = schema.getFieldTypes();
this.outputColsHelper = new OutputColsHelper(dataSchema, schema.getFieldNames(), schema.getFieldTypes(),
this.params.get(VectorToColumnsParams.RESERVED_COLS));
}

public void open() {
this.parser = getParser(fieldNames, fieldTypes, params);
this.handleInvalid = params.get(StringToColumnsParams.HANDLE_INVALID);
}

@Override
public Row map(Row row) {
if (!hasOpen) {
open();
hasOpen = true;
}
String text = (String) row.getField(idx);
Tuple2<Boolean, Row> parsed = parser.parse(text);

if (!parsed.f0 && handleInvalid == StringToColumnsParams.HandleInvalid.ERROR) {
throw new RuntimeException("Fail to parse \"" + text + "\"");
}
return outputColsHelper.getResultRow(row, parsed.f1);
}

/**
* Get the output data schema.
*/
@Override
public TableSchema getOutputSchema() {
return outputColsHelper.getResultSchema();
}

abstract protected StringParsers.StringParser getParser(
String[] fieldNames, TypeInformation[] fieldTypes, Params params);
}

public static class CsvToColumnsMapper extends BaseStringToColumnsMapper {
public CsvToColumnsMapper(TableSchema dataSchema, Params params) {
super(dataSchema, params);
}

@Override
protected StringParsers.StringParser getParser(String[] fieldNames, TypeInformation[] fieldTypes, Params params) {
String fieldDelim = params.get(CsvToColumnsParams.FIELD_DELIMITER);
Character quoteChar = params.get(CsvToColumnsParams.QUOTE_CHAR);
return new StringParsers.CsvParser(fieldTypes, fieldDelim, quoteChar);
}
}

public static class JsonToColumnsMapper extends BaseStringToColumnsMapper {
public JsonToColumnsMapper(TableSchema dataSchema, Params params) {
super(dataSchema, params);
}

@Override
protected StringParsers.StringParser getParser(String[] fieldNames, TypeInformation[] fieldTypes, Params params) {
return new StringParsers.JsonParser(fieldNames, fieldTypes);
}
}

public static class KvToColumnsMapper extends BaseStringToColumnsMapper {
public KvToColumnsMapper(TableSchema dataSchema, Params params) {
super(dataSchema, params);
}

@Override
protected StringParsers.StringParser getParser(String[] fieldNames, TypeInformation[] fieldTypes, Params params) {
String colDelim = params.get(KvToColumnsParams.COL_DELIMITER);
String valDelim = params.get(KvToColumnsParams.VAL_DELIMITER);
return new StringParsers.KvParser(fieldNames, fieldTypes, colDelim, valDelim);
}
}
}
Loading

0 comments on commit 5a090a3

Please sign in to comment.