Skip to content

Commit

Permalink
[Optimize]Connector增改接口的configs字段名调整为config (#1080)
Browse files Browse the repository at this point in the history
1、保持和原生一致;
2、当前是兼容状态,可同时支持configs和config;
  • Loading branch information
ZQKC authored Jul 5, 2023
1 parent 0cd071c commit bd58b48
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ public Result<Void> updateConnectorConfig(Long connectClusterId, String connecto

@Override
public Result<Void> createConnector(ConnectorCreateDTO dto, String operator) {
dto.getConfigs().put(KafkaConnectConstant.MIRROR_MAKER_NAME_FIELD_NAME, dto.getConnectorName());
dto.getSuitableConfig().put(KafkaConnectConstant.MIRROR_MAKER_NAME_FIELD_NAME, dto.getConnectorName());

Result<KSConnectorInfo> createResult = connectorService.createConnector(dto.getConnectClusterId(), dto.getConnectorName(), dto.getConfigs(), operator);
Result<KSConnectorInfo> createResult = connectorService.createConnector(dto.getConnectClusterId(), dto.getConnectorName(), dto.getSuitableConfig(), operator);
if (createResult.failed()) {
return Result.buildFromIgnoreData(createResult);
}
Expand All @@ -67,9 +67,9 @@ public Result<Void> createConnector(ConnectorCreateDTO dto, String operator) {

@Override
public Result<Void> createConnector(ConnectorCreateDTO dto, String heartbeatName, String checkpointName, String operator) {
dto.getConfigs().put(KafkaConnectConstant.MIRROR_MAKER_NAME_FIELD_NAME, dto.getConnectorName());
dto.getSuitableConfig().put(KafkaConnectConstant.MIRROR_MAKER_NAME_FIELD_NAME, dto.getConnectorName());

Result<KSConnectorInfo> createResult = connectorService.createConnector(dto.getConnectClusterId(), dto.getConnectorName(), dto.getConfigs(), operator);
Result<KSConnectorInfo> createResult = connectorService.createConnector(dto.getConnectClusterId(), dto.getConnectorName(), dto.getSuitableConfig(), operator);
if (createResult.failed()) {
return Result.buildFromIgnoreData(createResult);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,17 +132,17 @@ public Result<Void> createMirrorMaker(MirrorMakerCreateDTO dto, String operator)
} else if (checkpointResult.failed() && checkpointResult.failed()) {
return Result.buildFromRSAndMsg(
ResultStatus.KAFKA_CONNECTOR_OPERATE_FAILED,
String.format("创建 checkpoint & heartbeat 失败.\n失败信息分别为:%s\n\n%s", checkpointResult.getMessage(), heartbeatResult.getMessage())
String.format("创建 checkpoint & heartbeat 失败.%n失败信息分别为:%s%n%n%s", checkpointResult.getMessage(), heartbeatResult.getMessage())
);
} else if (checkpointResult.failed()) {
return Result.buildFromRSAndMsg(
ResultStatus.KAFKA_CONNECTOR_OPERATE_FAILED,
String.format("创建 checkpoint 失败.\n失败信息分别为:%s", checkpointResult.getMessage())
String.format("创建 checkpoint 失败.%n失败信息分别为:%s", checkpointResult.getMessage())
);
} else{
return Result.buildFromRSAndMsg(
ResultStatus.KAFKA_CONNECTOR_OPERATE_FAILED,
String.format("创建 heartbeat 失败.\n失败信息分别为:%s", heartbeatResult.getMessage())
String.format("创建 heartbeat 失败.%n失败信息分别为:%s", heartbeatResult.getMessage())
);
}
}
Expand Down Expand Up @@ -194,7 +194,7 @@ public Result<Void> modifyMirrorMakerConfig(MirrorMakerCreateDTO dto, String ope
return rv;
}

return connectorService.updateConnectorConfig(dto.getConnectClusterId(), dto.getConnectorName(), dto.getConfigs(), operator);
return connectorService.updateConnectorConfig(dto.getConnectClusterId(), dto.getConnectorName(), dto.getSuitableConfig(), operator);
}

@Override
Expand Down Expand Up @@ -426,7 +426,7 @@ public Result<List<Properties>> getMM2Configs(Long connectClusterId, String conn
public Result<List<ConnectConfigInfosVO>> validateConnectors(MirrorMakerCreateDTO dto) {
List<ConnectConfigInfosVO> voList = new ArrayList<>();

Result<ConnectConfigInfos> infoResult = pluginService.validateConfig(dto.getConnectClusterId(), dto.getConfigs());
Result<ConnectConfigInfos> infoResult = pluginService.validateConfig(dto.getConnectClusterId(), dto.getSuitableConfig());
if (infoResult.failed()) {
return Result.buildFromIgnoreData(infoResult);
}
Expand Down Expand Up @@ -480,11 +480,11 @@ public Result<Void> checkCreateMirrorMakerParamAndUnifyData(MirrorMakerCreateDTO
return Result.buildFromRSAndMsg(ResultStatus.CLUSTER_NOT_EXIST, MsgConstant.getClusterPhyNotExist(connectCluster.getKafkaClusterPhyId()));
}

if (!dto.getConfigs().containsKey(CONNECTOR_CLASS_FILED_NAME)) {
if (!dto.getSuitableConfig().containsKey(CONNECTOR_CLASS_FILED_NAME)) {
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "SourceConnector缺少connector.class");
}

if (!MIRROR_MAKER_SOURCE_CONNECTOR_TYPE.equals(dto.getConfigs().getProperty(CONNECTOR_CLASS_FILED_NAME))) {
if (!MIRROR_MAKER_SOURCE_CONNECTOR_TYPE.equals(dto.getSuitableConfig().getProperty(CONNECTOR_CLASS_FILED_NAME))) {
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "SourceConnector的connector.class类型错误");
}

Expand Down Expand Up @@ -589,9 +589,7 @@ public static List<ClusterMirrorMakerOverviewVO> supplyData2ClusterMirrorMakerOv
}
}

voList.forEach(elem -> {
elem.setMetricLines(metricLineMap.get(elem.getConnectClusterId() + "#" + elem.getConnectorName()));
});
voList.forEach(elem -> elem.setMetricLines(metricLineMap.get(elem.getConnectClusterId() + "#" + elem.getConnectorName())));

return voList;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,28 +1,36 @@
package com.xiaojukeji.know.streaming.km.common.bean.dto.connect.connector;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.xiaojukeji.know.streaming.km.common.bean.dto.connect.ClusterConnectorDTO;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.NoArgsConstructor;

import javax.validation.constraints.NotNull;
import java.util.Properties;

/**
* @author zengqiao
* @date 2022-10-17
*/
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
@NoArgsConstructor
@ApiModel(description = "创建Connector")
public class ConnectorCreateDTO extends ClusterConnectorDTO {
@NotNull(message = "configs不允许为空")
@ApiModelProperty(value = "配置", example = "")
@Deprecated
@ApiModelProperty(value = "配置, 优先使用config字段,3.5.0版本将删除该字段", example = "")
protected Properties configs;

public ConnectorCreateDTO(Long connectClusterId, String connectorName, Properties configs) {
@ApiModelProperty(value = "配置", example = "")
protected Properties config;

public ConnectorCreateDTO(Long connectClusterId, String connectorName, Properties config) {
super(connectClusterId, connectorName);
this.configs = configs;
this.config = config;
}

public Properties getSuitableConfig() {
return config != null? config: configs;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public void unifyData(Long sourceKafkaClusterId, String sourceBootstrapServers,
targetKafkaProps = new Properties();
}

this.unifyData(this.configs, sourceKafkaClusterId, sourceBootstrapServers, sourceKafkaProps, targetKafkaClusterId, targetBootstrapServers, targetKafkaProps);
this.unifyData(this.getSuitableConfig(), sourceKafkaClusterId, sourceBootstrapServers, sourceKafkaProps, targetKafkaClusterId, targetBootstrapServers, targetKafkaProps);

if (heartbeatConnectorConfigs != null) {
this.unifyData(this.heartbeatConnectorConfigs, sourceKafkaClusterId, sourceBootstrapServers, sourceKafkaProps, targetKafkaClusterId, targetBootstrapServers, targetKafkaProps);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
@Data
@ApiModel(description = "集群MM2状态信息")
public class MirrorMakerBaseStateVO extends BaseVO {

@ApiModelProperty(value = "worker数", example = "1")
private Integer workerCount;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
import com.xiaojukeji.know.streaming.km.common.enums.connect.ConnectActionEnum;
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
import com.xiaojukeji.know.streaming.km.core.service.connect.connector.ConnectorService;
import com.xiaojukeji.know.streaming.km.core.service.connect.plugin.PluginService;
import io.swagger.annotations.Api;
Expand Down Expand Up @@ -44,6 +45,10 @@ public class KafkaConnectorController {
@PostMapping(value = "connectors")
@ResponseBody
public Result<Void> createConnector(@Validated @RequestBody ConnectorCreateDTO dto) {
if (ValidateUtils.isNull(dto.getSuitableConfig())) {
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "config字段不能为空");
}

return connectorManager.createConnector(dto, HttpRequestUtil.getOperator());
}

Expand Down Expand Up @@ -73,14 +78,27 @@ public Result<Void> operateConnectors(@Validated @RequestBody ConnectorActionDTO
@PutMapping(value ="connectors-config")
@ResponseBody
public Result<Void> modifyConnectors(@Validated @RequestBody ConnectorCreateDTO dto) {
return connectorManager.updateConnectorConfig(dto.getConnectClusterId(), dto.getConnectorName(), dto.getConfigs(), HttpRequestUtil.getOperator());
if (ValidateUtils.isNull(dto.getSuitableConfig())) {
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "config字段不能为空");
}

return connectorManager.updateConnectorConfig(
dto.getConnectClusterId(),
dto.getConnectorName(),
dto.getSuitableConfig(),
HttpRequestUtil.getOperator()
);
}

@ApiOperation(value = "校验Connector配置", notes = "")
@PutMapping(value ="connectors-config/validate")
@ResponseBody
public Result<ConnectConfigInfosVO> validateConnectors(@Validated @RequestBody ConnectorCreateDTO dto) {
Result<ConnectConfigInfos> infoResult = pluginService.validateConfig(dto.getConnectClusterId(), dto.getConfigs());
if (ValidateUtils.isNull(dto.getSuitableConfig())) {
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "config字段不能为空");
}

Result<ConnectConfigInfos> infoResult = pluginService.validateConfig(dto.getConnectClusterId(), dto.getSuitableConfig());
if (infoResult.failed()) {
return Result.buildFromIgnoreData(infoResult);
}
Expand Down

0 comments on commit bd58b48

Please sign in to comment.