From bd58b48bcb34ec1e1fe3d54e32ed2df6e9c85908 Mon Sep 17 00:00:00 2001 From: EricZeng Date: Wed, 5 Jul 2023 13:43:19 +0800 Subject: [PATCH] =?UTF-8?q?[Optimize]Connector=E5=A2=9E=E6=94=B9=E6=8E=A5?= =?UTF-8?q?=E5=8F=A3=E7=9A=84configs=E5=AD=97=E6=AE=B5=E5=90=8D=E8=B0=83?= =?UTF-8?q?=E6=95=B4=E4=B8=BAconfig=20(#1080)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1、保持和原生一致; 2、当前是兼容状态,可同时支持configs和config; --- .../connector/impl/ConnectorManagerImpl.java | 8 +++---- .../mm2/impl/MirrorMakerManagerImpl.java | 18 +++++++-------- .../connect/connector/ConnectorCreateDTO.java | 18 ++++++++++----- .../dto/connect/mm2/MirrorMakerCreateDTO.java | 2 +- .../cluster/mm2/MirrorMakerBaseStateVO.java | 1 - .../v3/connect/KafkaConnectorController.java | 22 +++++++++++++++++-- 6 files changed, 46 insertions(+), 23 deletions(-) diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/connector/impl/ConnectorManagerImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/connector/impl/ConnectorManagerImpl.java index 5800b26f0..191afc6bb 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/connector/impl/ConnectorManagerImpl.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/connector/impl/ConnectorManagerImpl.java @@ -49,9 +49,9 @@ public Result updateConnectorConfig(Long connectClusterId, String connecto @Override public Result createConnector(ConnectorCreateDTO dto, String operator) { - dto.getConfigs().put(KafkaConnectConstant.MIRROR_MAKER_NAME_FIELD_NAME, dto.getConnectorName()); + dto.getSuitableConfig().put(KafkaConnectConstant.MIRROR_MAKER_NAME_FIELD_NAME, dto.getConnectorName()); - Result createResult = connectorService.createConnector(dto.getConnectClusterId(), dto.getConnectorName(), dto.getConfigs(), operator); + Result createResult = connectorService.createConnector(dto.getConnectClusterId(), dto.getConnectorName(), dto.getSuitableConfig(), operator); if (createResult.failed()) { return Result.buildFromIgnoreData(createResult); } @@ -67,9 +67,9 @@ public Result createConnector(ConnectorCreateDTO dto, String operator) { @Override public Result createConnector(ConnectorCreateDTO dto, String heartbeatName, String checkpointName, String operator) { - dto.getConfigs().put(KafkaConnectConstant.MIRROR_MAKER_NAME_FIELD_NAME, dto.getConnectorName()); + dto.getSuitableConfig().put(KafkaConnectConstant.MIRROR_MAKER_NAME_FIELD_NAME, dto.getConnectorName()); - Result createResult = connectorService.createConnector(dto.getConnectClusterId(), dto.getConnectorName(), dto.getConfigs(), operator); + Result createResult = connectorService.createConnector(dto.getConnectClusterId(), dto.getConnectorName(), dto.getSuitableConfig(), operator); if (createResult.failed()) { return Result.buildFromIgnoreData(createResult); } diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/mm2/impl/MirrorMakerManagerImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/mm2/impl/MirrorMakerManagerImpl.java index 99f2747f2..de10b0f00 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/mm2/impl/MirrorMakerManagerImpl.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/mm2/impl/MirrorMakerManagerImpl.java @@ -132,17 +132,17 @@ public Result createMirrorMaker(MirrorMakerCreateDTO dto, String operator) } else if (checkpointResult.failed() && checkpointResult.failed()) { return Result.buildFromRSAndMsg( ResultStatus.KAFKA_CONNECTOR_OPERATE_FAILED, - String.format("创建 checkpoint & heartbeat 失败.\n失败信息分别为:%s\n\n%s", checkpointResult.getMessage(), heartbeatResult.getMessage()) + String.format("创建 checkpoint & heartbeat 失败.%n失败信息分别为:%s%n%n%s", checkpointResult.getMessage(), heartbeatResult.getMessage()) ); } else if (checkpointResult.failed()) { return Result.buildFromRSAndMsg( ResultStatus.KAFKA_CONNECTOR_OPERATE_FAILED, - String.format("创建 checkpoint 失败.\n失败信息分别为:%s", checkpointResult.getMessage()) + String.format("创建 checkpoint 失败.%n失败信息分别为:%s", checkpointResult.getMessage()) ); } else{ return Result.buildFromRSAndMsg( ResultStatus.KAFKA_CONNECTOR_OPERATE_FAILED, - String.format("创建 heartbeat 失败.\n失败信息分别为:%s", heartbeatResult.getMessage()) + String.format("创建 heartbeat 失败.%n失败信息分别为:%s", heartbeatResult.getMessage()) ); } } @@ -194,7 +194,7 @@ public Result modifyMirrorMakerConfig(MirrorMakerCreateDTO dto, String ope return rv; } - return connectorService.updateConnectorConfig(dto.getConnectClusterId(), dto.getConnectorName(), dto.getConfigs(), operator); + return connectorService.updateConnectorConfig(dto.getConnectClusterId(), dto.getConnectorName(), dto.getSuitableConfig(), operator); } @Override @@ -426,7 +426,7 @@ public Result> getMM2Configs(Long connectClusterId, String conn public Result> validateConnectors(MirrorMakerCreateDTO dto) { List voList = new ArrayList<>(); - Result infoResult = pluginService.validateConfig(dto.getConnectClusterId(), dto.getConfigs()); + Result infoResult = pluginService.validateConfig(dto.getConnectClusterId(), dto.getSuitableConfig()); if (infoResult.failed()) { return Result.buildFromIgnoreData(infoResult); } @@ -480,11 +480,11 @@ public Result checkCreateMirrorMakerParamAndUnifyData(MirrorMakerCreateDTO return Result.buildFromRSAndMsg(ResultStatus.CLUSTER_NOT_EXIST, MsgConstant.getClusterPhyNotExist(connectCluster.getKafkaClusterPhyId())); } - if (!dto.getConfigs().containsKey(CONNECTOR_CLASS_FILED_NAME)) { + if (!dto.getSuitableConfig().containsKey(CONNECTOR_CLASS_FILED_NAME)) { return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "SourceConnector缺少connector.class"); } - if (!MIRROR_MAKER_SOURCE_CONNECTOR_TYPE.equals(dto.getConfigs().getProperty(CONNECTOR_CLASS_FILED_NAME))) { + if (!MIRROR_MAKER_SOURCE_CONNECTOR_TYPE.equals(dto.getSuitableConfig().getProperty(CONNECTOR_CLASS_FILED_NAME))) { return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "SourceConnector的connector.class类型错误"); } @@ -589,9 +589,7 @@ public static List supplyData2ClusterMirrorMakerOv } } - voList.forEach(elem -> { - elem.setMetricLines(metricLineMap.get(elem.getConnectClusterId() + "#" + elem.getConnectorName())); - }); + voList.forEach(elem -> elem.setMetricLines(metricLineMap.get(elem.getConnectClusterId() + "#" + elem.getConnectorName()))); return voList; } diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/connect/connector/ConnectorCreateDTO.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/connect/connector/ConnectorCreateDTO.java index 46639f0e4..038e617f4 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/connect/connector/ConnectorCreateDTO.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/connect/connector/ConnectorCreateDTO.java @@ -1,12 +1,12 @@ package com.xiaojukeji.know.streaming.km.common.bean.dto.connect.connector; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.xiaojukeji.know.streaming.km.common.bean.dto.connect.ClusterConnectorDTO; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; import lombok.Data; import lombok.NoArgsConstructor; -import javax.validation.constraints.NotNull; import java.util.Properties; /** @@ -14,15 +14,23 @@ * @date 2022-10-17 */ @Data +@JsonIgnoreProperties(ignoreUnknown = true) @NoArgsConstructor @ApiModel(description = "创建Connector") public class ConnectorCreateDTO extends ClusterConnectorDTO { - @NotNull(message = "configs不允许为空") - @ApiModelProperty(value = "配置", example = "") + @Deprecated + @ApiModelProperty(value = "配置, 优先使用config字段,3.5.0版本将删除该字段", example = "") protected Properties configs; - public ConnectorCreateDTO(Long connectClusterId, String connectorName, Properties configs) { + @ApiModelProperty(value = "配置", example = "") + protected Properties config; + + public ConnectorCreateDTO(Long connectClusterId, String connectorName, Properties config) { super(connectClusterId, connectorName); - this.configs = configs; + this.config = config; + } + + public Properties getSuitableConfig() { + return config != null? config: configs; } } diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/connect/mm2/MirrorMakerCreateDTO.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/connect/mm2/MirrorMakerCreateDTO.java index fa9867ecb..c2a60dacc 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/connect/mm2/MirrorMakerCreateDTO.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/connect/mm2/MirrorMakerCreateDTO.java @@ -40,7 +40,7 @@ public void unifyData(Long sourceKafkaClusterId, String sourceBootstrapServers, targetKafkaProps = new Properties(); } - this.unifyData(this.configs, sourceKafkaClusterId, sourceBootstrapServers, sourceKafkaProps, targetKafkaClusterId, targetBootstrapServers, targetKafkaProps); + this.unifyData(this.getSuitableConfig(), sourceKafkaClusterId, sourceBootstrapServers, sourceKafkaProps, targetKafkaClusterId, targetBootstrapServers, targetKafkaProps); if (heartbeatConnectorConfigs != null) { this.unifyData(this.heartbeatConnectorConfigs, sourceKafkaClusterId, sourceBootstrapServers, sourceKafkaProps, targetKafkaClusterId, targetBootstrapServers, targetKafkaProps); diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/cluster/mm2/MirrorMakerBaseStateVO.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/cluster/mm2/MirrorMakerBaseStateVO.java index 04aed0356..33ae15daf 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/cluster/mm2/MirrorMakerBaseStateVO.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/cluster/mm2/MirrorMakerBaseStateVO.java @@ -13,7 +13,6 @@ @Data @ApiModel(description = "集群MM2状态信息") public class MirrorMakerBaseStateVO extends BaseVO { - @ApiModelProperty(value = "worker数", example = "1") private Integer workerCount; diff --git a/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/connect/KafkaConnectorController.java b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/connect/KafkaConnectorController.java index d60314bbb..b03ca7cce 100644 --- a/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/connect/KafkaConnectorController.java +++ b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/connect/KafkaConnectorController.java @@ -14,6 +14,7 @@ import com.xiaojukeji.know.streaming.km.common.constant.Constant; import com.xiaojukeji.know.streaming.km.common.enums.connect.ConnectActionEnum; import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; +import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; import com.xiaojukeji.know.streaming.km.core.service.connect.connector.ConnectorService; import com.xiaojukeji.know.streaming.km.core.service.connect.plugin.PluginService; import io.swagger.annotations.Api; @@ -44,6 +45,10 @@ public class KafkaConnectorController { @PostMapping(value = "connectors") @ResponseBody public Result createConnector(@Validated @RequestBody ConnectorCreateDTO dto) { + if (ValidateUtils.isNull(dto.getSuitableConfig())) { + return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "config字段不能为空"); + } + return connectorManager.createConnector(dto, HttpRequestUtil.getOperator()); } @@ -73,14 +78,27 @@ public Result operateConnectors(@Validated @RequestBody ConnectorActionDTO @PutMapping(value ="connectors-config") @ResponseBody public Result modifyConnectors(@Validated @RequestBody ConnectorCreateDTO dto) { - return connectorManager.updateConnectorConfig(dto.getConnectClusterId(), dto.getConnectorName(), dto.getConfigs(), HttpRequestUtil.getOperator()); + if (ValidateUtils.isNull(dto.getSuitableConfig())) { + return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "config字段不能为空"); + } + + return connectorManager.updateConnectorConfig( + dto.getConnectClusterId(), + dto.getConnectorName(), + dto.getSuitableConfig(), + HttpRequestUtil.getOperator() + ); } @ApiOperation(value = "校验Connector配置", notes = "") @PutMapping(value ="connectors-config/validate") @ResponseBody public Result validateConnectors(@Validated @RequestBody ConnectorCreateDTO dto) { - Result infoResult = pluginService.validateConfig(dto.getConnectClusterId(), dto.getConfigs()); + if (ValidateUtils.isNull(dto.getSuitableConfig())) { + return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "config字段不能为空"); + } + + Result infoResult = pluginService.validateConfig(dto.getConnectClusterId(), dto.getSuitableConfig()); if (infoResult.failed()) { return Result.buildFromIgnoreData(infoResult); }