diff --git a/.gitignore b/.gitignore index a32168f1a..d7842e1ed 100644 --- a/.gitignore +++ b/.gitignore @@ -8,6 +8,7 @@ target/ *.eclipse.* *.iml plugins/ +sqlplugins/ lib/ .vertx/ .DS_Store diff --git a/core/src/main/java/com/dtstack/flink/sql/option/Options.java b/core/src/main/java/com/dtstack/flink/sql/option/Options.java index ba6296d1e..b7d425a53 100644 --- a/core/src/main/java/com/dtstack/flink/sql/option/Options.java +++ b/core/src/main/java/com/dtstack/flink/sql/option/Options.java @@ -72,6 +72,10 @@ public class Options { @OptionRequired(description = "log level") private String logLevel = "info"; + @OptionRequired(description = "file add to ship file") + private String addShipfile; + + public String getMode() { return mode; } @@ -183,4 +187,13 @@ public String getLogLevel() { public void setLogLevel(String logLevel) { this.logLevel = logLevel; } + + public String getAddShipfile() { + return addShipfile; + } + + public void setAddShipfile(String addShipfile) { + this.addShipfile = addShipfile; + } + } diff --git a/core/src/main/java/com/dtstack/flink/sql/parser/CreateTmpTableParser.java b/core/src/main/java/com/dtstack/flink/sql/parser/CreateTmpTableParser.java index 9ec5dcc0c..c0b55f7ee 100644 --- a/core/src/main/java/com/dtstack/flink/sql/parser/CreateTmpTableParser.java +++ b/core/src/main/java/com/dtstack/flink/sql/parser/CreateTmpTableParser.java @@ -22,11 +22,7 @@ import com.dtstack.flink.sql.util.DtStringUtil; import org.apache.calcite.config.Lex; -import org.apache.calcite.sql.SqlBasicCall; -import org.apache.calcite.sql.SqlJoin; -import org.apache.calcite.sql.SqlKind; -import org.apache.calcite.sql.SqlNode; -import org.apache.calcite.sql.SqlSelect; +import org.apache.calcite.sql.*; import org.apache.calcite.sql.parser.SqlParseException; import org.apache.calcite.sql.parser.SqlParser; import com.google.common.collect.Lists; @@ -164,6 +160,10 @@ private static void parseNode(SqlNode sqlNode, CreateTmpTableParser.SqlParserRes parseNode(unionRight, sqlParseResult); } break; + case MATCH_RECOGNIZE: + SqlMatchRecognize node = (SqlMatchRecognize) sqlNode; + sqlParseResult.addSourceTable(node.getTableRef().toString()); + break; default: //do nothing break; diff --git a/core/src/main/java/com/dtstack/flink/sql/parser/InsertSqlParser.java b/core/src/main/java/com/dtstack/flink/sql/parser/InsertSqlParser.java index e9d8cc179..2cacd01d4 100644 --- a/core/src/main/java/com/dtstack/flink/sql/parser/InsertSqlParser.java +++ b/core/src/main/java/com/dtstack/flink/sql/parser/InsertSqlParser.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package com.dtstack.flink.sql.parser; @@ -153,6 +152,7 @@ private static void parseNode(SqlNode sqlNode, SqlParseResult sqlParseResult){ /** * 将第一层 select 中的 sqlNode 转化为 AsNode,解决字段名冲突问题 + * 仅对 table.xx 这种类型的字段进行替换 * @param selectList select Node 的 select 字段 * @param sqlSelect 第一层解析出来的 selectNode */ @@ -160,7 +160,8 @@ private static void rebuildSelectNode(SqlNodeList selectList, SqlSelect sqlSelec SqlNodeList sqlNodes = new SqlNodeList(selectList.getParserPosition()); for (int index = 0; index < selectList.size(); index++) { - if (selectList.get(index).getKind().equals(SqlKind.AS)) { + if (selectList.get(index).getKind().equals(SqlKind.AS) + || ((SqlIdentifier) selectList.get(index)).names.size() == 1) { sqlNodes.add(selectList.get(index)); continue; } diff --git a/core/src/main/java/com/dtstack/flink/sql/util/AuthUtil.java b/core/src/main/java/com/dtstack/flink/sql/util/AuthUtil.java new file mode 100644 index 000000000..646a11b66 --- /dev/null +++ b/core/src/main/java/com/dtstack/flink/sql/util/AuthUtil.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.flink.sql.util; + +import org.apache.commons.io.FileUtils; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** + * Utility methods for helping with security tasks. + * Date: 2019/12/28 + * Company: www.dtstack.com + * @author maqi + */ +public class AuthUtil { + + public static String creatJaasFile(String prefix, String suffix, JAASConfig jaasConfig) throws IOException { + File krbConf = new File(System.getProperty("user.dir")); + File temp = File.createTempFile(prefix, suffix, krbConf); + temp.deleteOnExit(); + FileUtils.writeStringToFile(temp, jaasConfig.toString()); + return temp.getAbsolutePath(); + } + + + public static class JAASConfig { + private String entryName; + private String loginModule; + private String loginModuleFlag; + private Map loginModuleOptions; + + public JAASConfig(String entryName, String loginModule, String loginModuleFlag, Map loginModuleOptions) { + this.entryName = entryName; + this.loginModule = loginModule; + this.loginModuleFlag = loginModuleFlag; + this.loginModuleOptions = loginModuleOptions; + } + + public static Builder builder() { + return new Builder(); + } + + @Override + public String toString() { + StringBuilder stringBuilder = new StringBuilder(entryName).append(" {\n\t") + .append(loginModule).append(" ").append(loginModuleFlag).append("\n\t"); + String[] keys = loginModuleOptions.keySet().toArray(new String[loginModuleOptions.size()]); + for (int i = 0; i < keys.length; i++) { + stringBuilder.append(keys[i]).append("=").append(loginModuleOptions.get(keys[i])); + if (i != keys.length - 1) { + stringBuilder.append("\n\t"); + } else { + stringBuilder.append(";\n"); + } + + } + stringBuilder.append("\n").append("};"); + return stringBuilder.toString(); + } + + public static class Builder { + private String entryName; + private String loginModule; + private String loginModuleFlag; + private Map loginModuleOptions; + + public Builder setEntryName(String entryName) { + this.entryName = entryName; + return this; + } + + public Builder setLoginModule(String loginModule) { + this.loginModule = loginModule; + return this; + } + + public Builder setLoginModuleFlag(String loginModuleFlag) { + this.loginModuleFlag = loginModuleFlag; + return this; + } + + public Builder setLoginModuleOptions(Map loginModuleOptions) { + this.loginModuleOptions = loginModuleOptions; + return this; + } + + public JAASConfig build() { + return new JAASConfig( + entryName, loginModule, loginModuleFlag, loginModuleOptions); + } + } + } +} diff --git a/docs/config.md b/docs/config.md index 9aa8df994..27c4244e7 100644 --- a/docs/config.md +++ b/docs/config.md @@ -46,6 +46,11 @@ sh submit.sh -key1 val1 -key2 val2 * 描述:扩展jar路径,当前主要是UDF定义的jar; * 必选:否 * 默认值:无 + +* **addShipfile** + * 描述:扩展上传的文件,比如开启;Kerberos认证需要的keytab文件和krb5.conf文件 + * 必选:否 + * 默认值:无 * **confProp** * 描述:一些参数设置 diff --git a/docs/plugin/hbaseSide.md b/docs/plugin/hbaseSide.md index e590b02e7..29dc60bf9 100644 --- a/docs/plugin/hbaseSide.md +++ b/docs/plugin/hbaseSide.md @@ -43,7 +43,14 @@ | tableName | hbase 的表名称|是|| | cache | 维表缓存策略(NONE/LRU)|否|NONE| | partitionedJoin | 是否在維表join之前先根据 設定的key 做一次keyby操作(可以減少维表的数据缓存量)|否|false| - +|kerberosAuthEnable | 是否开启kerberos认证|否|false| +|regionserverPrincipal | regionserver的principal,这个值从hbase-site.xml的hbase.regionserver.kerberos.principal属性中获取|否|| +|clientKeytabFile|client的keytab 文件|否| +|clientPrincipal|client的principal|否|| +|zookeeperSaslClient | zookeeper.sasl.client值|否|true| +|securityKrb5Conf | java.security.krb5.conf值|否|| + 另外开启Kerberos认证还需要在VM参数中配置krb5, -Djava.security.krb5.conf=/Users/xuchao/Documents/flinkSql/kerberos/krb5.conf + 同时在addShipfile参数中添加keytab文件的路径,参数具体细节请看[命令参数说明](../config.md) -------------- ## 5.样例 @@ -168,4 +175,75 @@ into sideTable b on a.id=b.rowkey1 and a.name = b.rowkey2; ``` +### kerberos维表示例 +``` +CREATE TABLE MyTable( + name varchar, + channel varchar, + pv INT, + xctime bigint +)WITH( + type ='kafka11', + bootstrapServers ='172.16.8.107:9092', + zookeeperQuorum ='172.16.8.107:2181/kafka', + offsetReset ='latest', + topic ='es_test', + timezone='Asia/Shanghai', + updateMode ='append', + enableKeyPartitions ='false', + topicIsPattern ='false', + parallelism ='1' +); + +CREATE TABLE MyResult( + name varchar, + channel varchar +)WITH( + type ='mysql', + url ='jdbc:mysql://172.16.10.45:3306/test', + userName ='dtstack', + password ='abc123', + tableName ='myresult', + updateMode ='append', + parallelism ='1', + batchSize ='100', + batchWaitInterval ='1000' +); + +CREATE TABLE sideTable( + cf:name varchar as name, + cf:info varchar as info, + PRIMARY KEY(md5(name) +'test') , + PERIOD FOR SYSTEM_TIME +)WITH( + type ='hbase', + zookeeperQuorum ='172.16.10.104:2181,172.16.10.224:2181,172.16.10.252:2181', + zookeeperParent ='/hbase', + tableName ='workerinfo', + partitionedJoin ='false', + cache ='LRU', + cacheSize ='10000', + cacheTTLMs ='60000', + asyncTimeoutNum ='0', + parallelism ='1', + kerberosAuthEnable='true', + regionserverPrincipal='hbase/_HOST@DTSTACK.COM', + clientKeytabFile='test.keytab', + clientPrincipal='test@DTSTACK.COM', + securityKrb5Conf='krb5.conf', +); +insert into + MyResult +select + b.name as name, + a.channel + +from + MyTable a + +join + sideTable b + +on a.channel=b.name +``` diff --git a/docs/plugin/hbaseSink.md b/docs/plugin/hbaseSink.md index ef1be339b..5006f11a2 100644 --- a/docs/plugin/hbaseSink.md +++ b/docs/plugin/hbaseSink.md @@ -37,9 +37,17 @@ hbase2.0 |rowkey | hbase的rowkey关联的列信息,多个值以逗号隔开|是|| |updateMode|APPEND:不回撤数据,只下发增量数据,UPSERT:先删除回撤数据,然后更新|否|APPEND| |parallelism | 并行度设置|否|1| - - +|kerberosAuthEnable | 是否开启kerberos认证|否|false| +|regionserverPrincipal | regionserver的principal,这个值从hbase-site.xml的hbase.regionserver.kerberos.principal属性中获取|否|| +|clientKeytabFile|client的keytab 文件|否| +|clientPrincipal|client的principal|否|| +|zookeeperSaslClient | zookeeper.sasl.client值|否|true| +|securityKrb5Conf | java.security.krb5.conf值|否|| + 另外开启Kerberos认证还需要在VM参数中配置krb5, -Djava.security.krb5.conf=/Users/xuchao/Documents/flinkSql/kerberos/krb5.conf + 同时在addShipfile参数中添加keytab文件的路径,参数具体细节请看[命令参数说明](../config.md) ## 5.样例: + +### 普通结果表语句示例 ``` CREATE TABLE MyTable( name varchar, @@ -78,9 +86,59 @@ into channel, name from - MyTable a + MyTable a + + ``` +### kerberos认证结果表语句示例 +``` +CREATE TABLE MyTable( + name varchar, + channel varchar, + age int + )WITH( + type ='kafka10', + bootstrapServers ='172.16.8.107:9092', + zookeeperQuorum ='172.16.8.107:2181/kafka', + offsetReset ='latest', + topic ='mqTest01', + timezone='Asia/Shanghai', + updateMode ='append', + enableKeyPartitions ='false', + topicIsPattern ='false', + parallelism ='1' + ); + +CREATE TABLE MyResult( + cf:name varchar , + cf:channel varchar + )WITH( + type ='hbase', + zookeeperQuorum ='cdh2.cdhsite:2181,cdh4.cdhsite:2181', + zookeeperParent ='/hbase', + tableName ='myresult', + partitionedJoin ='false', + parallelism ='1', + rowKey='name', + kerberosAuthEnable='true', + regionserverPrincipal='hbase/_HOST@DTSTACK.COM', + clientKeytabFile='test.keytab', + clientPrincipal='test@DTSTACK.COM', + securityKrb5Conf='krb5.conf', + ); + +insert +into + MyResult + select + channel, + name + from + MyTable a + +``` + ## 6.hbase数据 ### 数据内容说明 hbase的rowkey 构建规则:以描述的rowkey字段值作为key,多个字段以'-'连接 diff --git a/docs/plugin/redisSink.md b/docs/plugin/redisSink.md index 103cb4997..eb0109f06 100644 --- a/docs/plugin/redisSink.md +++ b/docs/plugin/redisSink.md @@ -94,7 +94,8 @@ redis5.0 ## 6.redis完整样例 ### redis数据说明 -redis使用k-v格式存储,key的构建格式为tableName:privateKey:privateKeyValue:columnName, value=columnValue +redis使用散列类型 hash 数据结构,key=tableName_primaryKey1_primaryKey2,value={column1=value1, column2=value2} +如果以班级class表为例,id和name作为联合主键,那么redis的结构为 ### 源表数据内容 ``` @@ -103,10 +104,10 @@ redis使用k-v格式存储,key的构建格式为tableName:privateKey:privateKe ### redis实际数据内容 ``` 127.0.0.1:6379> keys * -1) "resultTable:name:roc:name" -2) "resultTable:name:roc:channel" -127.0.0.1:6379> get "resultTable:name:roc:name" -"roc" -127.0.0.1:6379> get "resultTable:name:roc:channel" -"daishu" +1) "resultTable_roc" +127.0.0.1:6379> hgetall resultTable_roc +1) "channel" +2) "daishu" +3) "name" +4) "roc" ``` \ No newline at end of file diff --git a/hbase/hbase-side/hbase-all-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAllReqRow.java b/hbase/hbase-side/hbase-all-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAllReqRow.java index 63d26d5dd..376bccd81 100644 --- a/hbase/hbase-side/hbase-all-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAllReqRow.java +++ b/hbase/hbase-side/hbase-all-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAllReqRow.java @@ -25,8 +25,10 @@ import com.dtstack.flink.sql.side.FieldInfo; import com.dtstack.flink.sql.side.JoinInfo; import com.dtstack.flink.sql.side.hbase.table.HbaseSideTableInfo; +import com.dtstack.flink.sql.side.hbase.utils.HbaseConfigUtils; import org.apache.calcite.sql.JoinType; import org.apache.commons.collections.map.HashedMap; +import org.apache.commons.lang.StringUtils; import org.apache.flink.api.java.typeutils.RowTypeInfo; import com.google.common.collect.Maps; import org.apache.flink.table.runtime.types.CRow; @@ -36,6 +38,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; @@ -44,10 +47,13 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; import java.io.IOException; +import java.security.PrivilegedAction; import java.sql.SQLException; import java.sql.Timestamp; import java.util.Calendar; @@ -65,6 +71,10 @@ public class HbaseAllReqRow extends BaseAllReqRow { private Map aliasNameInversion; private AtomicReference>> cacheRef = new AtomicReference<>(); + private Connection conn = null; + private Table table = null; + private ResultScanner resultScanner = null; + private Configuration conf = null; public HbaseAllReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List outFieldInfoList, AbstractSideTableInfo sideTableInfo) { super(new HbaseAllSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo)); @@ -166,13 +176,37 @@ public void flatMap(CRow input, Collector out) throws Exception { private void loadData(Map> tmpCache) throws SQLException { AbstractSideTableInfo sideTableInfo = sideInfo.getSideTableInfo(); HbaseSideTableInfo hbaseSideTableInfo = (HbaseSideTableInfo) sideTableInfo; - Configuration conf = new Configuration(); - conf.set("hbase.zookeeper.quorum", hbaseSideTableInfo.getHost()); - Connection conn = null; - Table table = null; - ResultScanner resultScanner = null; + boolean openKerberos = hbaseSideTableInfo.isKerberosAuthEnable(); + int loadDataCount = 0; try { - conn = ConnectionFactory.createConnection(conf); + if (openKerberos) { + conf = HbaseConfigUtils.getHadoopConfiguration(hbaseSideTableInfo.getHbaseConfig()); + conf.set(HbaseConfigUtils.KEY_HBASE_ZOOKEEPER_QUORUM, hbaseSideTableInfo.getHost()); + conf.set(HbaseConfigUtils.KEY_HBASE_ZOOKEEPER_ZNODE_QUORUM, hbaseSideTableInfo.getParent()); + String principal = HbaseConfigUtils.getPrincipal(hbaseSideTableInfo.getHbaseConfig()); + String keytab = HbaseConfigUtils.getKeytab(hbaseSideTableInfo.getHbaseConfig()); + + UserGroupInformation userGroupInformation = HbaseConfigUtils.loginAndReturnUGI(conf, principal, keytab); + Configuration finalConf = conf; + conn = userGroupInformation.doAs(new PrivilegedAction() { + @Override + public Connection run() { + try { + return ConnectionFactory.createConnection(finalConf); + } catch (IOException e) { + LOG.error("Get connection fail with config:{}", finalConf); + throw new RuntimeException(e); + } + } + }); + + } else { + conf = HbaseConfigUtils.getConfig(hbaseSideTableInfo.getHbaseConfig()); + conf.set(HbaseConfigUtils.KEY_HBASE_ZOOKEEPER_QUORUM, hbaseSideTableInfo.getHost()); + conf.set(HbaseConfigUtils.KEY_HBASE_ZOOKEEPER_ZNODE_QUORUM, hbaseSideTableInfo.getParent()); + conn = ConnectionFactory.createConnection(conf); + } + table = conn.getTable(TableName.valueOf(tableName)); resultScanner = table.getScanner(new Scan()); for (Result r : resultScanner) { @@ -187,13 +221,15 @@ private void loadData(Map> tmpCache) throws SQLExcep kv.put(aliasNameInversion.get(key.toString()), value); } + loadDataCount++; tmpCache.put(new String(r.getRow()), kv); } } catch (IOException e) { - LOG.error("", e); + throw new RuntimeException(e); } finally { + LOG.info("load Data count: {}", loadDataCount); try { - if (null != conn && !conn.isClosed()) { + if (null != conn) { conn.close(); } @@ -209,4 +245,5 @@ private void loadData(Map> tmpCache) throws SQLExcep } } } + } \ No newline at end of file diff --git a/hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAsyncReqRow.java b/hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAsyncReqRow.java index 56f50e27c..a6bfaca7a 100644 --- a/hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAsyncReqRow.java +++ b/hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAsyncReqRow.java @@ -31,6 +31,8 @@ import com.dtstack.flink.sql.side.hbase.rowkeydealer.RowKeyEqualModeDealer; import com.dtstack.flink.sql.side.hbase.table.HbaseSideTableInfo; import com.dtstack.flink.sql.factory.DTThreadFactory; +import com.dtstack.flink.sql.side.hbase.utils.HbaseConfigUtils; +import com.dtstack.flink.sql.util.AuthUtil; import com.google.common.collect.Maps; import com.stumbleupon.async.Deferred; import org.apache.commons.lang3.StringUtils; @@ -40,10 +42,13 @@ import org.apache.flink.table.runtime.types.CRow; import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo; import org.apache.flink.types.Row; +import org.hbase.async.Config; import org.hbase.async.HBaseClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.io.IOException; import java.sql.Timestamp; import java.util.Collections; import java.util.List; @@ -93,11 +98,27 @@ public void open(Configuration parameters) throws Exception { super.open(parameters); AbstractSideTableInfo sideTableInfo = sideInfo.getSideTableInfo(); HbaseSideTableInfo hbaseSideTableInfo = (HbaseSideTableInfo) sideTableInfo; + Map hbaseConfig = hbaseSideTableInfo.getHbaseConfig(); + ExecutorService executorService =new ThreadPoolExecutor(DEFAULT_POOL_SIZE, DEFAULT_POOL_SIZE, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new DTThreadFactory("hbase-aysnc")); - hBaseClient = new HBaseClient(hbaseSideTableInfo.getHost(), hbaseSideTableInfo.getParent(), executorService); + Config config = new Config(); + config.overrideConfig(HbaseConfigUtils.KEY_HBASE_ZOOKEEPER_QUORUM, hbaseSideTableInfo.getHost()); + config.overrideConfig(HbaseConfigUtils.KEY_HBASE_ZOOKEEPER_ZNODE_QUORUM, hbaseSideTableInfo.getParent()); + HbaseConfigUtils.loadKrb5Conf(hbaseConfig); + hbaseConfig.entrySet().forEach(entity -> { + config.overrideConfig(entity.getKey(), (String) entity.getValue()); + }); + + if (HbaseConfigUtils.asyncOpenKerberos(hbaseConfig)) { + String jaasStr = HbaseConfigUtils.buildJaasStr(hbaseConfig); + String jaasFilePath = HbaseConfigUtils.creatJassFile(jaasStr); + config.overrideConfig(HbaseConfigUtils.KEY_JAVA_SECURITY_AUTH_LOGIN_CONF, jaasFilePath); + } + + hBaseClient = new HBaseClient(config, executorService); try { Deferred deferred = hBaseClient.ensureTableExists(tableName) @@ -166,7 +187,6 @@ public void close() throws Exception { hBaseClient.shutdown(); } - class CheckResult{ private boolean connect; diff --git a/hbase/hbase-side/hbase-side-core/src/main/java/com/dtstack/flink/sql/side/hbase/table/HbaseSideParser.java b/hbase/hbase-side/hbase-side-core/src/main/java/com/dtstack/flink/sql/side/hbase/table/HbaseSideParser.java index 80753c40b..03868d618 100644 --- a/hbase/hbase-side/hbase-side-core/src/main/java/com/dtstack/flink/sql/side/hbase/table/HbaseSideParser.java +++ b/hbase/hbase-side/hbase-side-core/src/main/java/com/dtstack/flink/sql/side/hbase/table/HbaseSideParser.java @@ -69,6 +69,10 @@ public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map entity.getKey().contains(".")) + .map(entity -> hbaseTableInfo.getHbaseConfig().put(entity.getKey(), String.valueOf(entity.getValue()))) + .count(); return hbaseTableInfo; } diff --git a/hbase/hbase-side/hbase-side-core/src/main/java/com/dtstack/flink/sql/side/hbase/table/HbaseSideTableInfo.java b/hbase/hbase-side/hbase-side-core/src/main/java/com/dtstack/flink/sql/side/hbase/table/HbaseSideTableInfo.java index 3cedb2c68..51597d583 100644 --- a/hbase/hbase-side/hbase-side-core/src/main/java/com/dtstack/flink/sql/side/hbase/table/HbaseSideTableInfo.java +++ b/hbase/hbase-side/hbase-side-core/src/main/java/com/dtstack/flink/sql/side/hbase/table/HbaseSideTableInfo.java @@ -50,8 +50,22 @@ public class HbaseSideTableInfo extends AbstractSideTableInfo { private Map columnNameFamily; + private Map hbaseConfig = Maps.newHashMap(); + private String tableName; + private boolean kerberosAuthEnable; + + private String regionserverKeytabFile; + + private String regionserverPrincipal; + + private String jaasPrincipal; + + private String securityKrb5Conf; + + private String zookeeperSaslClient; + private String[] columnRealNames; private List columnRealNameList = Lists.newArrayList(); @@ -148,6 +162,63 @@ public void setPreRowKey(boolean preRowKey) { this.preRowKey = preRowKey; } + public boolean isKerberosAuthEnable() { + return kerberosAuthEnable; + } + + public void setKerberosAuthEnable(boolean kerberosAuthEnable) { + this.kerberosAuthEnable = kerberosAuthEnable; + } + + public String getRegionserverKeytabFile() { + return regionserverKeytabFile; + } + + public void setRegionserverKeytabFile(String regionserverKeytabFile) { + this.regionserverKeytabFile = regionserverKeytabFile; + } + + public String getRegionserverPrincipal() { + return regionserverPrincipal; + } + + public void setRegionserverPrincipal(String regionserverPrincipal) { + this.regionserverPrincipal = regionserverPrincipal; + } + + public String getJaasPrincipal() { + return jaasPrincipal; + } + + public void setJaasPrincipal(String jaasPrincipal) { + this.jaasPrincipal = jaasPrincipal; + } + + public String getSecurityKrb5Conf() { + return securityKrb5Conf; + } + + public void setSecurityKrb5Conf(String securityKrb5Conf) { + this.securityKrb5Conf = securityKrb5Conf; + } + + public String getZookeeperSaslClient() { + return zookeeperSaslClient; + } + + public void setZookeeperSaslClient(String zookeeperSaslClient) { + this.zookeeperSaslClient = zookeeperSaslClient; + } + + public Map getHbaseConfig() { + return hbaseConfig; + } + + public void setHbaseConfig(Map hbaseConfig) { + this.hbaseConfig = hbaseConfig; + } + + @Override public void finish(){ super.finish(); diff --git a/hbase/hbase-side/hbase-side-core/src/main/java/com/dtstack/flink/sql/side/hbase/utils/HbaseConfigUtils.java b/hbase/hbase-side/hbase-side-core/src/main/java/com/dtstack/flink/sql/side/hbase/utils/HbaseConfigUtils.java new file mode 100644 index 000000000..a7708aaae --- /dev/null +++ b/hbase/hbase-side/hbase-side-core/src/main/java/com/dtstack/flink/sql/side/hbase/utils/HbaseConfigUtils.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.flink.sql.side.hbase.utils; + +import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +/** + * + * The utility class of HBase connection + * + * Date: 2019/12/24 + * Company: www.dtstack.com + * @author maqi + */ +public class HbaseConfigUtils { + + private static final Logger LOG = LoggerFactory.getLogger(HbaseConfigUtils.class); + // sync side kerberos + private final static String AUTHENTICATION_TYPE = "Kerberos"; + private final static String KEY_HBASE_SECURITY_AUTHENTICATION = "hbase.security.authentication"; + private final static String KEY_HBASE_SECURITY_AUTHORIZATION = "hbase.security.authorization"; + private final static String KEY_HBASE_MASTER_KERBEROS_PRINCIPAL = "hbase.master.kerberos.principal"; + private final static String KEY_HBASE_MASTER_KEYTAB_FILE = "hbase.master.keytab.file"; + private final static String KEY_HBASE_REGIONSERVER_KEYTAB_FILE = "hbase.regionserver.keytab.file"; + private final static String KEY_HBASE_REGIONSERVER_KERBEROS_PRINCIPAL = "hbase.regionserver.kerberos.principal"; + + // async side kerberos + private final static String KEY_HBASE_SECURITY_AUTH_ENABLE = "hbase.security.auth.enable"; + private final static String KEY_HBASE_SASL_CLIENTCONFIG = "hbase.sasl.clientconfig"; + private final static String KEY_HBASE_KERBEROS_REGIONSERVER_PRINCIPAL = "hbase.kerberos.regionserver.principal"; + private static final String KEY_KEY_TAB = "hbase.keytab"; + private static final String KEY_PRINCIPAL = "hbase.principal"; + + public final static String KEY_HBASE_ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum"; + public final static String KEY_HBASE_ZOOKEEPER_ZNODE_QUORUM = "hbase.zookeeper.znode.parent"; + + + private static final String KEY_JAVA_SECURITY_KRB5_CONF = "java.security.krb5.conf"; + public static final String KEY_JAVA_SECURITY_AUTH_LOGIN_CONF = "java.security.auth.login.config"; + + + private static final String SP = File.separator; + private static final String KEY_KRB5_CONF = "krb5.conf"; + + + private static List KEYS_KERBEROS_REQUIRED = Arrays.asList( + KEY_HBASE_SECURITY_AUTHENTICATION, + KEY_HBASE_MASTER_KERBEROS_PRINCIPAL, + KEY_HBASE_MASTER_KEYTAB_FILE, + KEY_HBASE_REGIONSERVER_KEYTAB_FILE, + KEY_HBASE_REGIONSERVER_KERBEROS_PRINCIPAL + ); + + private static List ASYNC_KEYS_KERBEROS_REQUIRED = Arrays.asList( + KEY_HBASE_SECURITY_AUTH_ENABLE, + KEY_HBASE_SASL_CLIENTCONFIG, + KEY_HBASE_KERBEROS_REGIONSERVER_PRINCIPAL, + KEY_HBASE_SECURITY_AUTHENTICATION, + KEY_KEY_TAB); + + + public static Configuration getConfig(Map hbaseConfigMap) { + Configuration hConfiguration = HBaseConfiguration.create(); + + for (Map.Entry entry : hbaseConfigMap.entrySet()) { + if (entry.getValue() != null && !(entry.getValue() instanceof Map)) { + hConfiguration.set(entry.getKey(), entry.getValue().toString()); + } + } + return hConfiguration; + } + + public static boolean openKerberos(Map hbaseConfigMap) { + if (!MapUtils.getBooleanValue(hbaseConfigMap, KEY_HBASE_SECURITY_AUTHORIZATION)) { + return false; + } + return AUTHENTICATION_TYPE.equalsIgnoreCase(MapUtils.getString(hbaseConfigMap, KEY_HBASE_SECURITY_AUTHENTICATION)); + } + + public static boolean asyncOpenKerberos(Map hbaseConfigMap) { + if (!MapUtils.getBooleanValue(hbaseConfigMap, KEY_HBASE_SECURITY_AUTH_ENABLE)) { + return false; + } + return AUTHENTICATION_TYPE.equalsIgnoreCase(MapUtils.getString(hbaseConfigMap, KEY_HBASE_SECURITY_AUTHENTICATION)); + } + + + + + public static Configuration getHadoopConfiguration(Map hbaseConfigMap) { + for (String key : KEYS_KERBEROS_REQUIRED) { + if (StringUtils.isEmpty(MapUtils.getString(hbaseConfigMap, key))) { + throw new IllegalArgumentException(String.format("Must provide [%s] when authentication is Kerberos", key)); + } + } + loadKrb5Conf(hbaseConfigMap); + + Configuration conf = new Configuration(); + if (hbaseConfigMap == null) { + return conf; + } + + hbaseConfigMap.forEach((key, val) -> { + if (val != null) { + conf.set(key, val.toString()); + } + }); + + return conf; + } + + public static String getPrincipal(Map hbaseConfigMap) { + String principal = MapUtils.getString(hbaseConfigMap, KEY_HBASE_MASTER_KERBEROS_PRINCIPAL); + if (StringUtils.isNotEmpty(principal)) { + return principal; + } + + throw new IllegalArgumentException(""); + } + + public static String getKeytab(Map hbaseConfigMap) { + String keytab = MapUtils.getString(hbaseConfigMap, KEY_HBASE_MASTER_KEYTAB_FILE); + if (StringUtils.isNotEmpty(keytab)) { + return keytab; + } + + throw new IllegalArgumentException(""); + } + + public static void loadKrb5Conf(Map kerberosConfig) { + String krb5FilePath = MapUtils.getString(kerberosConfig, KEY_JAVA_SECURITY_KRB5_CONF); + if (!org.apache.commons.lang.StringUtils.isEmpty(krb5FilePath)) { + System.setProperty(KEY_JAVA_SECURITY_KRB5_CONF, krb5FilePath);; + } + } + + public static String creatJassFile(String configStr) throws IOException { + String fileName = System.getProperty("user.dir"); + File krbConf = new File(fileName); + File temp = File.createTempFile("JAAS", ".conf", krbConf); + temp.deleteOnExit(); + BufferedWriter out = new BufferedWriter(new FileWriter(temp, false)); + out.write(configStr + "\n"); + out.close(); + return temp.getAbsolutePath(); + } + + public static String buildJaasStr(Map kerberosConfig) { + for (String key : ASYNC_KEYS_KERBEROS_REQUIRED) { + if (StringUtils.isEmpty(MapUtils.getString(kerberosConfig, key))) { + throw new IllegalArgumentException(String.format("Must provide [%s] when authentication is Kerberos", key)); + } + } + + String keyTab = MapUtils.getString(kerberosConfig, KEY_KEY_TAB); + String principal = MapUtils.getString(kerberosConfig, KEY_PRINCIPAL); + + StringBuilder jaasSB = new StringBuilder("Client {\n" + + " com.sun.security.auth.module.Krb5LoginModule required\n" + + " useKeyTab=true\n" + + " useTicketCache=false\n"); + jaasSB.append(" keyTab=\"").append(keyTab).append("\"").append("\n"); + jaasSB.append(" principal=\"").append(principal).append("\"").append(";\n"); + jaasSB.append("};"); + return jaasSB.toString(); + } + + + + public static UserGroupInformation loginAndReturnUGI(Configuration conf, String principal, String keytab) throws IOException { + if (conf == null) { + throw new IllegalArgumentException("kerberos conf can not be null"); + } + + if (org.apache.commons.lang.StringUtils.isEmpty(principal)) { + throw new IllegalArgumentException("principal can not be null"); + } + + if (org.apache.commons.lang.StringUtils.isEmpty(keytab)) { + throw new IllegalArgumentException("keytab can not be null"); + } + + conf.set("hadoop.security.authentication", "Kerberos"); + UserGroupInformation.setConfiguration(conf); + + return UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab); + } +} diff --git a/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseConfigUtils.java b/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseConfigUtils.java new file mode 100644 index 000000000..57c63d243 --- /dev/null +++ b/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseConfigUtils.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.flink.sql.sink.hbase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * + * The utility class of HBase connection + * + * Date: 2019/12/24 + * Company: www.dtstack.com + * @author maqi + */ +public class HbaseConfigUtils { + + private static final Logger LOG = LoggerFactory.getLogger(HbaseConfigUtils.class); + // sync side kerberos + public final static String KEY_HBASE_SECURITY_AUTHENTICATION = "hbase.security.authentication"; + public final static String KEY_HBASE_SECURITY_AUTHORIZATION = "hbase.security.authorization"; + public final static String KEY_HBASE_MASTER_KEYTAB_FILE = "hbase.master.keytab.file"; + public final static String KEY_HBASE_MASTER_KERBEROS_PRINCIPAL = "hbase.master.kerberos.principal"; + public final static String KEY_HBASE_REGIONSERVER_KEYTAB_FILE = "hbase.regionserver.keytab.file"; + public final static String KEY_HBASE_REGIONSERVER_KERBEROS_PRINCIPAL = "hbase.regionserver.kerberos.principal"; + + public final static String KEY_HBASE_ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum"; + public final static String KEY_HBASE_ZOOKEEPER_ZNODE_QUORUM = "zookeeper.znode.parent"; + + public final static String KEY_HBASE_CLIENT_KEYTAB_FILE = "hbase.client.keytab.file"; + public final static String KEY_HBASE_CLIENT_KERBEROS_PRINCIPAL = "hbase.client.kerberos.principal"; + + + public static final String KEY_JAVA_SECURITY_KRB5_CONF = "java.security.krb5.conf"; + public static final String KEY_ZOOKEEPER_SASL_CLIENT = "zookeeper.sasl.client"; + + public static UserGroupInformation loginAndReturnUGI(Configuration conf, String principal, String keytab) throws IOException { + LOG.info("loginAndReturnUGI principal {}",principal); + LOG.info("loginAndReturnUGI keytab {}",keytab); + if (conf == null) { + throw new IllegalArgumentException("kerberos conf can not be null"); + } + + if (org.apache.commons.lang.StringUtils.isEmpty(principal)) { + throw new IllegalArgumentException("principal can not be null"); + } + + if (org.apache.commons.lang.StringUtils.isEmpty(keytab)) { + throw new IllegalArgumentException("keytab can not be null"); + } + + conf.set("hadoop.security.authentication", "Kerberos"); + UserGroupInformation.setConfiguration(conf); + + return UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab); + } +} diff --git a/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java b/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java index e89ec0b46..cddef858e 100644 --- a/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java +++ b/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java @@ -28,18 +28,21 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; import java.io.IOException; +import java.security.PrivilegedAction; +import java.util.List; import java.util.Map; import java.util.Set; @@ -60,6 +63,14 @@ public class HbaseOutputFormat extends AbstractDtRichOutputFormat { private String[] columnTypes; private Map columnNameFamily; + private boolean kerberosAuthEnable; + private String regionserverKeytabFile; + private String regionserverPrincipal; + private String securityKrb5Conf; + private String zookeeperSaslClient; + private String clientPrincipal; + private String clientKeytabFile; + private String[] families; private String[] qualifiers; @@ -67,26 +78,78 @@ public class HbaseOutputFormat extends AbstractDtRichOutputFormat { private transient Connection conn; private transient Table table; + private transient ChoreService choreService; + @Override public void configure(Configuration parameters) { LOG.warn("---configure---"); conf = HBaseConfiguration.create(); - conf.set("hbase.zookeeper.quorum", host); - if (zkParent != null && !"".equals(zkParent)) { - conf.set("zookeeper.znode.parent", zkParent); - } - LOG.warn("---configure end ---"); } @Override public void open(int taskNumber, int numTasks) throws IOException { LOG.warn("---open---"); - conn = ConnectionFactory.createConnection(conf); + openConn(); table = conn.getTable(TableName.valueOf(tableName)); LOG.warn("---open end(get table from hbase) ---"); initMetric(); } + private void openConn(){ + try{ + if (kerberosAuthEnable) { + LOG.info("open kerberos conn"); + openKerberosConn(); + } else { + LOG.info("open conn"); + conf.set(HbaseConfigUtils.KEY_HBASE_ZOOKEEPER_QUORUM, host); + conf.set(HbaseConfigUtils.KEY_HBASE_ZOOKEEPER_ZNODE_QUORUM, zkParent); + conn = ConnectionFactory.createConnection(conf); + } + }catch (Exception e){ + throw new RuntimeException(e); + } + + } + private void openKerberosConn() throws IOException { + conf.set(HbaseConfigUtils.KEY_HBASE_ZOOKEEPER_QUORUM, host); + conf.set(HbaseConfigUtils.KEY_HBASE_ZOOKEEPER_ZNODE_QUORUM, zkParent); + + LOG.info("kerberos config:{}", this.toString()); + Preconditions.checkArgument(!StringUtils.isEmpty(clientPrincipal), " clientPrincipal not null!"); + Preconditions.checkArgument(!StringUtils.isEmpty(clientKeytabFile), " clientKeytabFile not null!"); + + fillSyncKerberosConfig(conf, regionserverPrincipal, zookeeperSaslClient, securityKrb5Conf); + + clientKeytabFile = System.getProperty("user.dir") + File.separator + clientKeytabFile; + clientPrincipal = !StringUtils.isEmpty(clientPrincipal) ? clientPrincipal : regionserverPrincipal; + + conf.set(HbaseConfigUtils.KEY_HBASE_CLIENT_KEYTAB_FILE, clientKeytabFile); + conf.set(HbaseConfigUtils.KEY_HBASE_CLIENT_KERBEROS_PRINCIPAL, clientPrincipal); + + UserGroupInformation userGroupInformation = HbaseConfigUtils.loginAndReturnUGI(conf, clientPrincipal, clientKeytabFile); + org.apache.hadoop.conf.Configuration finalConf = conf; + conn = userGroupInformation.doAs(new PrivilegedAction() { + @Override + public Connection run() { + try { + ScheduledChore authChore = AuthUtil.getAuthChore(finalConf); + if (authChore != null) { + choreService = new ChoreService("hbaseKerberosSink"); + choreService.scheduleChore(authChore); + } + + return ConnectionFactory.createConnection(finalConf); + } catch (IOException e) { + LOG.error("Get connection fail with config:{}", finalConf); + throw new RuntimeException(e); + } + } + }); + } + + + @Override public void writeRecord(Tuple2 tuple2) { Tuple2 tupleTrans = tuple2; @@ -101,13 +164,14 @@ public void writeRecord(Tuple2 tuple2) { protected void dealInsert(Row record) { Put put = getPutByRow(record); - if (put == null) { + if (put == null || put.isEmpty()) { + outDirtyRecords.inc(); return; } try { table.put(put); - } catch (IOException e) { + } catch (Exception e) { if (outDirtyRecords.getCount() % DIRTY_PRINT_FREQUENCY == 0 || LOG.isDebugEnabled()) { LOG.error("record insert failed ..{}", record.toString()); LOG.error("", e); @@ -194,7 +258,6 @@ public void close() throws IOException { conn = null; } } - private HbaseOutputFormat() { } @@ -231,11 +294,6 @@ public HbaseOutputFormatBuilder setRowkey(String rowkey) { return this; } - public HbaseOutputFormatBuilder setUpdateMode(String updateMode) { - format.updateMode = updateMode; - return this; - } - public HbaseOutputFormatBuilder setColumnNames(String[] columnNames) { format.columnNames = columnNames; return this; @@ -251,6 +309,42 @@ public HbaseOutputFormatBuilder setColumnNameFamily(Map columnNa return this; } + public HbaseOutputFormatBuilder setKerberosAuthEnable(boolean kerberosAuthEnable) { + format.kerberosAuthEnable = kerberosAuthEnable; + return this; + } + + public HbaseOutputFormatBuilder setRegionserverKeytabFile(String regionserverKeytabFile) { + format.regionserverKeytabFile = regionserverKeytabFile; + return this; + } + + public HbaseOutputFormatBuilder setRegionserverPrincipal(String regionserverPrincipal) { + format.regionserverPrincipal = regionserverPrincipal; + return this; + } + + public HbaseOutputFormatBuilder setSecurityKrb5Conf(String securityKrb5Conf) { + format.securityKrb5Conf = securityKrb5Conf; + return this; + } + + public HbaseOutputFormatBuilder setZookeeperSaslClient(String zookeeperSaslClient) { + format.zookeeperSaslClient = zookeeperSaslClient; + return this; + } + + public HbaseOutputFormatBuilder setClientPrincipal(String clientPrincipal) { + format.clientPrincipal = clientPrincipal; + return this; + } + + public HbaseOutputFormatBuilder setClientKeytabFile(String clientKeytabFile) { + format.clientKeytabFile = clientKeytabFile; + return this; + } + + public HbaseOutputFormat finish() { Preconditions.checkNotNull(format.host, "zookeeperQuorum should be specified"); Preconditions.checkNotNull(format.tableName, "tableName should be specified"); @@ -265,7 +359,7 @@ public HbaseOutputFormat finish() { String[] columns = keySet.toArray(new String[keySet.size()]); for (int i = 0; i < columns.length; ++i) { String col = columns[i]; - String[] part = StringUtils.split(col, ":"); + String[] part = col.split(":"); families[i] = part[0]; qualifiers[i] = part[1]; } @@ -278,5 +372,39 @@ public HbaseOutputFormat finish() { } + private void fillSyncKerberosConfig(org.apache.hadoop.conf.Configuration config, String regionserverPrincipal, + String zookeeperSaslClient, String securityKrb5Conf) throws IOException { + if (StringUtils.isEmpty(regionserverPrincipal)) { + throw new IllegalArgumentException("Must provide regionserverPrincipal when authentication is Kerberos"); + } + config.set(HbaseConfigUtils.KEY_HBASE_MASTER_KERBEROS_PRINCIPAL, regionserverPrincipal); + config.set(HbaseConfigUtils.KEY_HBASE_REGIONSERVER_KERBEROS_PRINCIPAL, regionserverPrincipal); + config.set(HbaseConfigUtils.KEY_HBASE_SECURITY_AUTHORIZATION, "true"); + config.set(HbaseConfigUtils.KEY_HBASE_SECURITY_AUTHENTICATION, "kerberos"); + + + if (!StringUtils.isEmpty(zookeeperSaslClient)) { + System.setProperty(HbaseConfigUtils.KEY_ZOOKEEPER_SASL_CLIENT, zookeeperSaslClient); + } + + if (!StringUtils.isEmpty(securityKrb5Conf)) { + String krb5ConfPath = System.getProperty("user.dir") + File.separator + securityKrb5Conf; + LOG.info("krb5ConfPath:{}", krb5ConfPath); + System.setProperty(HbaseConfigUtils.KEY_JAVA_SECURITY_KRB5_CONF, krb5ConfPath); + } + } + + @Override + public String toString() { + return "HbaseOutputFormat kerberos{" + + "kerberosAuthEnable=" + kerberosAuthEnable + + ", regionserverKeytabFile='" + regionserverKeytabFile + '\'' + + ", regionserverPrincipal='" + regionserverPrincipal + '\'' + + ", securityKrb5Conf='" + securityKrb5Conf + '\'' + + ", zookeeperSaslClient='" + zookeeperSaslClient + '\'' + + ", clientPrincipal='" + clientPrincipal + '\'' + + ", clientKeytabFile='" + clientKeytabFile + '\'' + + '}'; + } } diff --git a/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseSink.java b/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseSink.java index 13bd98b70..09f5944b4 100644 --- a/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseSink.java +++ b/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseSink.java @@ -52,6 +52,18 @@ public class HbaseSink implements RetractStreamTableSink, IStreamSinkGener< protected String tableName; protected String updateMode; protected String rowkey; + protected String registerTabName; + + protected boolean kerberosAuthEnable; + protected String regionserverKeytabFile; + protected String regionserverPrincipal; + protected String securityKrb5Conf; + protected String zookeeperSaslClient; + + private String clientPrincipal; + private String clientKeytabFile; + private int parallelism = -1; + public HbaseSink() { // TO DO NOTHING @@ -66,20 +78,40 @@ public HbaseSink genStreamSink(AbstractTargetTableInfo targetTableInfo) { this.tableName = hbaseTableInfo.getTableName(); this.rowkey = hbaseTableInfo.getRowkey(); this.columnNameFamily = hbaseTableInfo.getColumnNameFamily(); - this.updateMode = hbaseTableInfo.getUpdateMode(); + this.registerTabName = hbaseTableInfo.getName(); + + this.kerberosAuthEnable = hbaseTableInfo.isKerberosAuthEnable(); + this.regionserverKeytabFile = hbaseTableInfo.getRegionserverKeytabFile(); + this.regionserverPrincipal = hbaseTableInfo.getRegionserverPrincipal(); + this.securityKrb5Conf = hbaseTableInfo.getSecurityKrb5Conf(); + this.zookeeperSaslClient = hbaseTableInfo.getZookeeperSaslClient(); + + this.clientKeytabFile = hbaseTableInfo.getClientKeytabFile(); + this.clientPrincipal = hbaseTableInfo.getClientPrincipal(); + + Integer tmpSinkParallelism = hbaseTableInfo.getParallelism(); + if (tmpSinkParallelism != null) { + this.parallelism = tmpSinkParallelism; + } return this; } @Override public void emitDataStream(DataStream> dataStream) { HbaseOutputFormat.HbaseOutputFormatBuilder builder = HbaseOutputFormat.buildHbaseOutputFormat(); - builder.setHost(this.zookeeperQuorum) - .setZkParent(this.parent) - .setTable(this.tableName) - .setRowkey(rowkey) - .setUpdateMode(updateMode) - .setColumnNames(fieldNames) - .setColumnNameFamily(columnNameFamily); + builder.setHost(this.zookeeperQuorum).setZkParent(this.parent).setTable(this.tableName); + + builder.setRowkey(rowkey); + builder.setColumnNames(fieldNames); + builder.setColumnNameFamily(columnNameFamily); + builder.setKerberosAuthEnable(kerberosAuthEnable); + builder.setRegionserverKeytabFile(regionserverKeytabFile); + builder.setRegionserverPrincipal(regionserverPrincipal); + builder.setSecurityKrb5Conf(securityKrb5Conf); + builder.setZookeeperSaslClient(zookeeperSaslClient); + + builder.setClientPrincipal(clientPrincipal); + builder.setClientKeytabFile(clientKeytabFile); HbaseOutputFormat outputFormat = builder.finish(); RichSinkFunction richSinkFunction = new OutputFormatSinkFunction(outputFormat); diff --git a/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/table/HbaseSinkParser.java b/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/table/HbaseSinkParser.java index 001443daa..5105e0fc0 100644 --- a/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/table/HbaseSinkParser.java +++ b/hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/table/HbaseSinkParser.java @@ -55,6 +55,16 @@ public class HbaseSinkParser extends AbstractTableParser { public static final String UPDATE_KEY = "updateMode"; + public static final String KERBEROS_AUTH_ENABLE_KEY = "kerberosAuthEnable"; + public static final String REGIONSERVER_KEYTAB_FILE_KEY = "regionserverKeytabFile"; + public static final String REGIONSERVER_PRINCIPAL_KEY = "regionserverPrincipal"; + public static final String SECURITY_KRB5_CONF_KEY = "securityKrb5Conf"; + public static final String ZOOKEEPER_SASL_CLINT_KEY = "zookeeperSaslClient"; + + public static final String CLIENT_PRINCIPAL_KEY = "clientPrincipal"; + public static final String CLIENT_KEYTABFILE_KEY = "clientKeytabFile"; + + @Override protected boolean fieldNameNeedsUpperCase() { return false; @@ -73,6 +83,15 @@ public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map hbaseConfig = Maps.newHashMap(); + public HbaseTableInfo(){ setType(CURR_TYPE); } @@ -149,4 +167,69 @@ public String getType() { return super.getType().toLowerCase(); } + + public Map getHbaseConfig() { + return hbaseConfig; + } + + public void setHbaseConfig(Map hbaseConfig) { + this.hbaseConfig = hbaseConfig; + } + + public boolean isKerberosAuthEnable() { + return kerberosAuthEnable; + } + + public void setKerberosAuthEnable(boolean kerberosAuthEnable) { + this.kerberosAuthEnable = kerberosAuthEnable; + } + + public String getRegionserverKeytabFile() { + return regionserverKeytabFile; + } + + public void setRegionserverKeytabFile(String regionserverKeytabFile) { + this.regionserverKeytabFile = regionserverKeytabFile; + } + + public String getRegionserverPrincipal() { + return regionserverPrincipal; + } + + public void setRegionserverPrincipal(String regionserverPrincipal) { + this.regionserverPrincipal = regionserverPrincipal; + } + + public String getSecurityKrb5Conf() { + return securityKrb5Conf; + } + + public void setSecurityKrb5Conf(String securityKrb5Conf) { + this.securityKrb5Conf = securityKrb5Conf; + } + + public String getZookeeperSaslClient() { + return zookeeperSaslClient; + } + + public void setZookeeperSaslClient(String zookeeperSaslClient) { + this.zookeeperSaslClient = zookeeperSaslClient; + } + + public String getClientPrincipal() { + return clientPrincipal; + } + + public void setClientPrincipal(String clientPrincipal) { + this.clientPrincipal = clientPrincipal; + } + + public String getClientKeytabFile() { + return clientKeytabFile; + } + + public void setClientKeytabFile(String clientKeytabFile) { + this.clientKeytabFile = clientKeytabFile; + } + } diff --git a/launcher/pom.xml b/launcher/pom.xml index ea921d87a..df9e8ae29 100644 --- a/launcher/pom.xml +++ b/launcher/pom.xml @@ -38,7 +38,7 @@ com.alibaba fastjson - 1.2.7 + 1.2.70 diff --git a/launcher/src/main/java/com/dtstack/flink/sql/launcher/perjob/ConfigParseUtil.java b/launcher/src/main/java/com/dtstack/flink/sql/launcher/perjob/ConfigParseUtil.java new file mode 100644 index 000000000..5acdb59a1 --- /dev/null +++ b/launcher/src/main/java/com/dtstack/flink/sql/launcher/perjob/ConfigParseUtil.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.flink.sql.launcher.perjob; + +import org.apache.commons.io.Charsets; + +import java.io.UnsupportedEncodingException; +import java.net.URLDecoder; +import java.util.Arrays; +import java.util.List; + +/** + * Date: 2019/12/28 + * Company: www.dtstack.com + * @author maqi + */ +public class ConfigParseUtil { + + public static List parsePathFromStr(String pathStr) throws UnsupportedEncodingException { + String addjarPath = URLDecoder.decode(pathStr, Charsets.UTF_8.toString()); + if (addjarPath.length() > 2) { + addjarPath = addjarPath.substring(1,addjarPath.length()-1).replace("\"",""); + } + return Arrays.asList(addjarPath.split(",")); + } +} diff --git a/launcher/src/main/java/com/dtstack/flink/sql/launcher/perjob/PerJobClusterClientBuilder.java b/launcher/src/main/java/com/dtstack/flink/sql/launcher/perjob/PerJobClusterClientBuilder.java index 5f531f447..f83779f4c 100644 --- a/launcher/src/main/java/com/dtstack/flink/sql/launcher/perjob/PerJobClusterClientBuilder.java +++ b/launcher/src/main/java/com/dtstack/flink/sql/launcher/perjob/PerJobClusterClientBuilder.java @@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory; import java.io.File; +import java.io.UnsupportedEncodingException; import java.net.MalformedURLException; import java.net.URL; import java.util.ArrayList; @@ -81,7 +82,7 @@ public void init(String yarnConfDir, Configuration flinkConfig, Properties userC } public AbstractYarnClusterDescriptor createPerJobClusterDescriptor(String flinkJarPath, Options launcherOptions, JobGraph jobGraph) - throws MalformedURLException { + throws MalformedURLException, UnsupportedEncodingException { String flinkConf = StringUtils.isEmpty(launcherOptions.getFlinkconf()) ? DEFAULT_CONF_DIR : launcherOptions.getFlinkconf(); AbstractYarnClusterDescriptor clusterDescriptor = getClusterDescriptor(flinkConfig, yarnConf, flinkConf); @@ -105,6 +106,7 @@ public AbstractYarnClusterDescriptor createPerJobClusterDescriptor(String flinkJ } else { throw new RuntimeException("The Flink jar path is null"); } + // classpath , all node need contain plugin jar String pluginLoadMode = launcherOptions.getPluginLoadMode(); if (StringUtils.equalsIgnoreCase(pluginLoadMode, EPluginLoadMode.CLASSPATH.name())) { @@ -117,6 +119,14 @@ public AbstractYarnClusterDescriptor createPerJobClusterDescriptor(String flinkJ + " Currently only classpath and shipfile are supported."); } + // add user customized file to shipfile + if (!StringUtils.isBlank(launcherOptions.getAddShipfile())) { + List paths = ConfigParseUtil.parsePathFromStr(launcherOptions.getAddShipfile()); + paths.forEach(path -> { + shipFiles.add(new File(path)); + }); + } + clusterDescriptor.addShipFiles(shipFiles); clusterDescriptor.setName(launcherOptions.getName()); String queue = launcherOptions.getQueue(); diff --git a/rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/all/AbstractRdbAllReqRow.java b/rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/all/AbstractRdbAllReqRow.java index 082c546e9..23b7b19d4 100644 --- a/rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/all/AbstractRdbAllReqRow.java +++ b/rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/all/AbstractRdbAllReqRow.java @@ -93,7 +93,6 @@ protected void initCache() throws SQLException { protected void reloadCache() { //reload cacheRef and replace to old cacheRef Map>> newCache = Maps.newConcurrentMap(); - cacheRef.set(newCache); try { loadData(newCache); } catch (SQLException e) { @@ -123,6 +122,11 @@ public void flatMap(CRow value, Collector out) throws Exception { List> cacheList = cacheRef.get().get(cacheKey); if (CollectionUtils.isEmpty(cacheList) && sideInfo.getJoinType() == JoinType.LEFT) { out.collect(new CRow(fillData(value.row(), null), value.change())); + return; + } + + if (CollectionUtils.isEmpty(cacheList)) { + return; } cacheList.forEach(one -> out.collect(new CRow(fillData(value.row(), one), value.change()))); diff --git a/rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncReqRow.java b/rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncReqRow.java index 9d2b9671f..57586dbc9 100644 --- a/rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncReqRow.java +++ b/rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncReqRow.java @@ -131,35 +131,41 @@ private void connectWithRetry(Map inputParams, CRow input, Resul AtomicLong failCounter = new AtomicLong(0); AtomicBoolean finishFlag = new AtomicBoolean(false); while(!finishFlag.get()){ - CountDownLatch latch = new CountDownLatch(1); - rdbSqlClient.getConnection(conn -> { - try { - if(conn.failed()){ - connectionStatus.set(false); - if(failCounter.getAndIncrement() % 1000 == 0){ - LOG.error("getConnection error", conn.cause()); - } - if(failCounter.get() >= sideInfo.getSideTableInfo().getConnectRetryMaxNum(100)){ - resultFuture.completeExceptionally(conn.cause()); - finishFlag.set(true); + try{ + CountDownLatch latch = new CountDownLatch(1); + rdbSqlClient.getConnection(conn -> { + try { + if(conn.failed()){ + connectionStatus.set(false); + if(failCounter.getAndIncrement() % 1000 == 0){ + LOG.error("getConnection error", conn.cause()); + } + if(failCounter.get() >= sideInfo.getSideTableInfo().getConnectRetryMaxNum(100)){ + resultFuture.completeExceptionally(conn.cause()); + finishFlag.set(true); + } + return; } - return; + connectionStatus.set(true); + ScheduledFuture timerFuture = registerTimer(input, resultFuture); + cancelTimerWhenComplete(resultFuture, timerFuture); + handleQuery(conn.result(), inputParams, input, resultFuture); + finishFlag.set(true); + } catch (Exception e) { + dealFillDataError(input, resultFuture, e); + } finally { + latch.countDown(); } - connectionStatus.set(true); - ScheduledFuture timerFuture = registerTimer(input, resultFuture); - cancelTimerWhenComplete(resultFuture, timerFuture); - handleQuery(conn.result(), inputParams, input, resultFuture); - finishFlag.set(true); - } catch (Exception e) { - dealFillDataError(input, resultFuture, e); - } finally { - latch.countDown(); + }); + try { + latch.await(); + } catch (InterruptedException e) { + LOG.error("", e); } - }); - try { - latch.await(); - } catch (InterruptedException e) { - LOG.error("", e); + + } catch (Exception e){ + //数据源队列溢出情况 + connectionStatus.set(false); } if(!finishFlag.get()){ try { diff --git a/rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/JDBCTypeConvertUtils.java b/rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/JDBCTypeConvertUtils.java index a41cad5ef..3813386bc 100644 --- a/rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/JDBCTypeConvertUtils.java +++ b/rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/JDBCTypeConvertUtils.java @@ -106,7 +106,7 @@ public static void setField(PreparedStatement upload, int type, Object field, in break; case java.sql.Types.FLOAT: case java.sql.Types.DOUBLE: - upload.setDouble(index + 1, (double) field); + upload.setDouble(index + 1, Double.parseDouble(field.toString())); break; case java.sql.Types.DECIMAL: case java.sql.Types.NUMERIC: diff --git a/rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/writer/AppendOnlyWriter.java b/rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/writer/AppendOnlyWriter.java index 3559d4376..7c3ff4b09 100644 --- a/rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/writer/AppendOnlyWriter.java +++ b/rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/writer/AppendOnlyWriter.java @@ -118,11 +118,11 @@ public void executeUpdate(Connection connection) { } catch (SQLException e1) { throw new RuntimeException(e1); } - metricOutputFormat.outDirtyRecords.inc(); if (metricOutputFormat.outDirtyRecords.getCount() % DIRTYDATA_PRINT_FREQUENTY == 0 || LOG.isDebugEnabled()) { LOG.error("record insert failed ,this row is {}", row.toString()); LOG.error("", e); } + metricOutputFormat.outDirtyRecords.inc(); } }); rows.clear(); diff --git a/redis5/redis5-sink/src/main/java/com/dtstack/flink/sql/sink/redis/RedisOutputFormat.java b/redis5/redis5-sink/src/main/java/com/dtstack/flink/sql/sink/redis/RedisOutputFormat.java index ae4fe5a4b..ab97cf60e 100644 --- a/redis5/redis5-sink/src/main/java/com/dtstack/flink/sql/sink/redis/RedisOutputFormat.java +++ b/redis5/redis5-sink/src/main/java/com/dtstack/flink/sql/sink/redis/RedisOutputFormat.java @@ -19,6 +19,8 @@ package com.dtstack.flink.sql.sink.redis; import com.dtstack.flink.sql.outputformat.AbstractDtRichOutputFormat; +import com.dtstack.flink.sql.sink.redis.enums.RedisType; +import com.google.common.collect.Maps; import org.apache.commons.lang3.StringUtils; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -35,11 +37,7 @@ import java.io.Closeable; import java.io.IOException; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Set; +import java.util.*; /** * @author yanxi @@ -49,7 +47,7 @@ public class RedisOutputFormat extends AbstractDtRichOutputFormat { private String url; - private String database; + private String database = "0"; private String tableName; @@ -71,7 +69,7 @@ public class RedisOutputFormat extends AbstractDtRichOutputFormat { protected List primaryKeys; - protected int timeout; + protected int timeout = 10000; private JedisPool pool; @@ -121,29 +119,21 @@ private void establishConnection() { String[] ipPortPair = StringUtils.split(ipPort, ":"); addresses.add(new HostAndPort(ipPortPair[0].trim(), Integer.valueOf(ipPortPair[1].trim()))); } - if (timeout == 0){ - timeout = 10000; - } - if (database == null) - { - database = "0"; - } - switch (redisType){ - //单机 - case 1: + switch (RedisType.parse(redisType)){ + case STANDALONE: pool = new JedisPool(poolConfig, firstIp, Integer.parseInt(firstPort), timeout, password, Integer.parseInt(database)); jedis = pool.getResource(); break; - //哨兵 - case 2: + case SENTINEL: jedisSentinelPool = new JedisSentinelPool(masterName, ipPorts, poolConfig, timeout, password, Integer.parseInt(database)); jedis = jedisSentinelPool.getResource(); break; - //集群 - case 3: + case CLUSTER: jedis = new JedisCluster(addresses, timeout, timeout, 10, password, poolConfig); + break; default: + throw new RuntimeException("unsupport redis type[ " + redisType + "]"); } } @@ -158,36 +148,14 @@ public void writeRecord(Tuple2 record) throws IOException { if (row.getArity() != fieldNames.length) { return; } - - HashMap map = new HashMap<>(8); - for (String primaryKey : primaryKeys) { - for (int i = 0; i < fieldNames.length; i++) { - if (fieldNames[i].equals(primaryKey)) { - map.put(primaryKey, i); - } - } - } - - List kvList = new LinkedList<>(); - for (String primaryKey : primaryKeys){ - StringBuilder primaryKv = new StringBuilder(); - int index = map.get(primaryKey).intValue(); - primaryKv.append(primaryKey).append(":").append(row.getField(index)); - kvList.add(primaryKv.toString()); - } - - String perKey = String.join(":", kvList); - for (int i = 0; i < fieldNames.length; i++) { - StringBuilder key = new StringBuilder(); - key.append(tableName).append(":").append(perKey).append(":").append(fieldNames[i]); - - String value = "null"; - Object field = row.getField(i); - if (field != null) { - value = field.toString(); - } - jedis.set(key.toString(), value); + Map refData = Maps.newHashMap(); + for(int i = 0; i < fieldNames.length; i++){ + refData.put(fieldNames[i], row.getField(i)); } + String redisKey = buildCacheKey(refData); + refData.entrySet().forEach(e ->{ + jedis.hset(redisKey, e.getKey(), String.valueOf(e.getValue())); + }); if (outRecords.getCount() % ROW_PRINT_FREQUENCY == 0){ LOG.info(record.toString()); @@ -211,6 +179,17 @@ public void close() throws IOException { } + public String buildCacheKey(Map refData) { + StringBuilder keyBuilder = new StringBuilder(tableName); + for(String primaryKey : primaryKeys){ + if(!refData.containsKey(primaryKey)){ + return null; + } + keyBuilder.append("_").append(refData.get(primaryKey)); + } + return keyBuilder.toString(); + } + public static RedisOutputFormatBuilder buildRedisOutputFormat(){ return new RedisOutputFormatBuilder(); } diff --git a/redis5/redis5-sink/src/main/java/com/dtstack/flink/sql/sink/redis/enums/RedisType.java b/redis5/redis5-sink/src/main/java/com/dtstack/flink/sql/sink/redis/enums/RedisType.java new file mode 100644 index 000000000..7a4054dfc --- /dev/null +++ b/redis5/redis5-sink/src/main/java/com/dtstack/flink/sql/sink/redis/enums/RedisType.java @@ -0,0 +1,33 @@ +package com.dtstack.flink.sql.sink.redis.enums; + +public enum RedisType { + /** + * 单机 + */ + STANDALONE(1), + /** + * 哨兵 + */ + SENTINEL(2), + /** + * 集群 + */ + CLUSTER(3); + int type; + RedisType(int type){ + this.type = type; + } + + public int getType(){ + return type; + } + + public static RedisType parse(int redisType){ + for(RedisType type : RedisType.values()){ + if(type.getType() == redisType){ + return type; + } + } + throw new RuntimeException("unsupport redis type["+ redisType + "]"); + } +} diff --git a/redis5/redis5-sink/src/main/java/com/dtstack/flink/sql/sink/redis/table/RedisSinkParser.java b/redis5/redis5-sink/src/main/java/com/dtstack/flink/sql/sink/redis/table/RedisSinkParser.java index e965eeecb..8961f7da9 100644 --- a/redis5/redis5-sink/src/main/java/com/dtstack/flink/sql/sink/redis/table/RedisSinkParser.java +++ b/redis5/redis5-sink/src/main/java/com/dtstack/flink/sql/sink/redis/table/RedisSinkParser.java @@ -50,11 +50,11 @@ public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map primaryKeysList = Lists.newArrayList(); if (!StringUtils.isEmpty(primaryKeysStr)) { + List primaryKeysList = Lists.newArrayList(); primaryKeysList = Arrays.asList(StringUtils.split(primaryKeysStr, ",")); + redisTableInfo.setPrimaryKeys(primaryKeysList); } - redisTableInfo.setPrimaryKeys(primaryKeysList); return redisTableInfo; }