Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Optimize]统一DB元信息更新格式-Part1 #1125

Merged
merged 1 commit into from
Aug 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.xiaojukeji.know.streaming.km.common.bean.vo.connect.connector.ConnectorStateVO;
import com.xiaojukeji.know.streaming.km.common.constant.connect.KafkaConnectConstant;
import com.xiaojukeji.know.streaming.km.core.service.connect.connector.ConnectorService;
import com.xiaojukeji.know.streaming.km.core.service.connect.connector.OpConnectorService;
import com.xiaojukeji.know.streaming.km.core.service.connect.plugin.PluginService;
import com.xiaojukeji.know.streaming.km.core.service.connect.worker.WorkerConnectorService;
import org.apache.kafka.connect.runtime.AbstractStatus;
Expand All @@ -30,6 +31,9 @@ public class ConnectorManagerImpl implements ConnectorManager {
@Autowired
private ConnectorService connectorService;

@Autowired
private OpConnectorService opConnectorService;

@Autowired
private WorkerConnectorService workerConnectorService;

Expand All @@ -44,37 +48,37 @@ public Result<Void> updateConnectorConfig(Long connectClusterId, String connecto
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "Connector参数错误");
}

return connectorService.updateConnectorConfig(connectClusterId, connectorName, configs, operator);
return opConnectorService.updateConnectorConfig(connectClusterId, connectorName, configs, operator);
}

@Override
public Result<Void> createConnector(ConnectorCreateDTO dto, String operator) {
dto.getSuitableConfig().put(KafkaConnectConstant.MIRROR_MAKER_NAME_FIELD_NAME, dto.getConnectorName());

Result<KSConnectorInfo> createResult = connectorService.createConnector(dto.getConnectClusterId(), dto.getConnectorName(), dto.getSuitableConfig(), operator);
Result<KSConnectorInfo> createResult = opConnectorService.createConnector(dto.getConnectClusterId(), dto.getConnectorName(), dto.getSuitableConfig(), operator);
if (createResult.failed()) {
return Result.buildFromIgnoreData(createResult);
}

Result<KSConnector> ksConnectorResult = connectorService.getAllConnectorInfoFromCluster(dto.getConnectClusterId(), dto.getConnectorName());
Result<KSConnector> ksConnectorResult = connectorService.getConnectorFromKafka(dto.getConnectClusterId(), dto.getConnectorName());
if (ksConnectorResult.failed()) {
return Result.buildFromRSAndMsg(ResultStatus.SUCCESS, "创建成功,但是获取元信息失败,页面元信息会存在1分钟延迟");
}

connectorService.addNewToDB(ksConnectorResult.getData());
opConnectorService.addNewToDB(ksConnectorResult.getData());
return Result.buildSuc();
}

@Override
public Result<Void> createConnector(ConnectorCreateDTO dto, String heartbeatName, String checkpointName, String operator) {
dto.getSuitableConfig().put(KafkaConnectConstant.MIRROR_MAKER_NAME_FIELD_NAME, dto.getConnectorName());

Result<KSConnectorInfo> createResult = connectorService.createConnector(dto.getConnectClusterId(), dto.getConnectorName(), dto.getSuitableConfig(), operator);
Result<KSConnectorInfo> createResult = opConnectorService.createConnector(dto.getConnectClusterId(), dto.getConnectorName(), dto.getSuitableConfig(), operator);
if (createResult.failed()) {
return Result.buildFromIgnoreData(createResult);
}

Result<KSConnector> ksConnectorResult = connectorService.getAllConnectorInfoFromCluster(dto.getConnectClusterId(), dto.getConnectorName());
Result<KSConnector> ksConnectorResult = connectorService.getConnectorFromKafka(dto.getConnectClusterId(), dto.getConnectorName());
if (ksConnectorResult.failed()) {
return Result.buildFromRSAndMsg(ResultStatus.SUCCESS, "创建成功,但是获取元信息失败,页面元信息会存在1分钟延迟");
}
Expand All @@ -83,7 +87,7 @@ public Result<Void> createConnector(ConnectorCreateDTO dto, String heartbeatName
connector.setCheckpointConnectorName(checkpointName);
connector.setHeartbeatConnectorName(heartbeatName);

connectorService.addNewToDB(connector);
opConnectorService.addNewToDB(connector);
return Result.buildSuc();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
import com.xiaojukeji.know.streaming.km.core.service.connect.cluster.ConnectClusterService;
import com.xiaojukeji.know.streaming.km.core.service.connect.connector.ConnectorService;
import com.xiaojukeji.know.streaming.km.core.service.connect.connector.OpConnectorService;
import com.xiaojukeji.know.streaming.km.core.service.connect.mm2.MirrorMakerMetricService;
import com.xiaojukeji.know.streaming.km.core.service.connect.plugin.PluginService;
import com.xiaojukeji.know.streaming.km.core.service.connect.worker.WorkerConnectorService;
Expand Down Expand Up @@ -67,6 +68,9 @@ public class MirrorMakerManagerImpl implements MirrorMakerManager {
@Autowired
private ConnectorService connectorService;

@Autowired
private OpConnectorService opConnectorService;

@Autowired
private WorkerConnectorService workerConnectorService;

Expand Down Expand Up @@ -156,20 +160,20 @@ public Result<Void> deleteMirrorMaker(Long connectClusterId, String sourceConnec

Result<Void> rv = Result.buildSuc();
if (!ValidateUtils.isBlank(connectorPO.getCheckpointConnectorName())) {
rv = connectorService.deleteConnector(connectClusterId, connectorPO.getCheckpointConnectorName(), operator);
rv = opConnectorService.deleteConnector(connectClusterId, connectorPO.getCheckpointConnectorName(), operator);
}
if (rv.failed()) {
return rv;
}

if (!ValidateUtils.isBlank(connectorPO.getHeartbeatConnectorName())) {
rv = connectorService.deleteConnector(connectClusterId, connectorPO.getHeartbeatConnectorName(), operator);
rv = opConnectorService.deleteConnector(connectClusterId, connectorPO.getHeartbeatConnectorName(), operator);
}
if (rv.failed()) {
return rv;
}

return connectorService.deleteConnector(connectClusterId, sourceConnectorName, operator);
return opConnectorService.deleteConnector(connectClusterId, sourceConnectorName, operator);
}

@Override
Expand All @@ -181,20 +185,20 @@ public Result<Void> modifyMirrorMakerConfig(MirrorMakerCreateDTO dto, String ope

Result<Void> rv = Result.buildSuc();
if (!ValidateUtils.isBlank(connectorPO.getCheckpointConnectorName()) && dto.getCheckpointConnectorConfigs() != null) {
rv = connectorService.updateConnectorConfig(dto.getConnectClusterId(), connectorPO.getCheckpointConnectorName(), dto.getCheckpointConnectorConfigs(), operator);
rv = opConnectorService.updateConnectorConfig(dto.getConnectClusterId(), connectorPO.getCheckpointConnectorName(), dto.getCheckpointConnectorConfigs(), operator);
}
if (rv.failed()) {
return rv;
}

if (!ValidateUtils.isBlank(connectorPO.getHeartbeatConnectorName()) && dto.getHeartbeatConnectorConfigs() != null) {
rv = connectorService.updateConnectorConfig(dto.getConnectClusterId(), connectorPO.getHeartbeatConnectorName(), dto.getHeartbeatConnectorConfigs(), operator);
rv = opConnectorService.updateConnectorConfig(dto.getConnectClusterId(), connectorPO.getHeartbeatConnectorName(), dto.getHeartbeatConnectorConfigs(), operator);
}
if (rv.failed()) {
return rv;
}

return connectorService.updateConnectorConfig(dto.getConnectClusterId(), dto.getConnectorName(), dto.getSuitableConfig(), operator);
return opConnectorService.updateConnectorConfig(dto.getConnectClusterId(), dto.getConnectorName(), dto.getSuitableConfig(), operator);
}

@Override
Expand All @@ -206,20 +210,20 @@ public Result<Void> restartMirrorMaker(Long connectClusterId, String sourceConne

Result<Void> rv = Result.buildSuc();
if (!ValidateUtils.isBlank(connectorPO.getCheckpointConnectorName())) {
rv = connectorService.restartConnector(connectClusterId, connectorPO.getCheckpointConnectorName(), operator);
rv = opConnectorService.restartConnector(connectClusterId, connectorPO.getCheckpointConnectorName(), operator);
}
if (rv.failed()) {
return rv;
}

if (!ValidateUtils.isBlank(connectorPO.getHeartbeatConnectorName())) {
rv = connectorService.restartConnector(connectClusterId, connectorPO.getHeartbeatConnectorName(), operator);
rv = opConnectorService.restartConnector(connectClusterId, connectorPO.getHeartbeatConnectorName(), operator);
}
if (rv.failed()) {
return rv;
}

return connectorService.restartConnector(connectClusterId, sourceConnectorName, operator);
return opConnectorService.restartConnector(connectClusterId, sourceConnectorName, operator);
}

@Override
Expand All @@ -231,20 +235,20 @@ public Result<Void> stopMirrorMaker(Long connectClusterId, String sourceConnecto

Result<Void> rv = Result.buildSuc();
if (!ValidateUtils.isBlank(connectorPO.getCheckpointConnectorName())) {
rv = connectorService.stopConnector(connectClusterId, connectorPO.getCheckpointConnectorName(), operator);
rv = opConnectorService.stopConnector(connectClusterId, connectorPO.getCheckpointConnectorName(), operator);
}
if (rv.failed()) {
return rv;
}

if (!ValidateUtils.isBlank(connectorPO.getHeartbeatConnectorName())) {
rv = connectorService.stopConnector(connectClusterId, connectorPO.getHeartbeatConnectorName(), operator);
rv = opConnectorService.stopConnector(connectClusterId, connectorPO.getHeartbeatConnectorName(), operator);
}
if (rv.failed()) {
return rv;
}

return connectorService.stopConnector(connectClusterId, sourceConnectorName, operator);
return opConnectorService.stopConnector(connectClusterId, sourceConnectorName, operator);
}

@Override
Expand All @@ -256,20 +260,20 @@ public Result<Void> resumeMirrorMaker(Long connectClusterId, String sourceConnec

Result<Void> rv = Result.buildSuc();
if (!ValidateUtils.isBlank(connectorPO.getCheckpointConnectorName())) {
rv = connectorService.resumeConnector(connectClusterId, connectorPO.getCheckpointConnectorName(), operator);
rv = opConnectorService.resumeConnector(connectClusterId, connectorPO.getCheckpointConnectorName(), operator);
}
if (rv.failed()) {
return rv;
}

if (!ValidateUtils.isBlank(connectorPO.getHeartbeatConnectorName())) {
rv = connectorService.resumeConnector(connectClusterId, connectorPO.getHeartbeatConnectorName(), operator);
rv = opConnectorService.resumeConnector(connectClusterId, connectorPO.getHeartbeatConnectorName(), operator);
}
if (rv.failed()) {
return rv;
}

return connectorService.resumeConnector(connectClusterId, sourceConnectorName, operator);
return opConnectorService.resumeConnector(connectClusterId, sourceConnectorName, operator);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public List<ConnectorMetrics> collectConnectMetrics(ConnectCluster connectCluste
Long connectClusterId = connectCluster.getId();

List<VersionControlItem> items = versionControlService.listVersionControlItem(this.getClusterVersion(connectCluster), collectorType().getCode());
Result<List<String>> connectorList = connectorService.listConnectorsFromCluster(connectClusterId);
Result<List<String>> connectorList = connectorService.listConnectorsFromCluster(connectCluster);

FutureWaitUtil<Void> future = this.getFutureUtilByClusterPhyId(connectClusterId);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.meta;

import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectCluster;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.utils.Tuple;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

/**
* Kafka元信息服务接口
*/
public interface KafkaMetaService<T> {
/**
* 从Kafka中获取数据
* @param connectCluster connect集群
* @return 全部资源列表, 成功的资源列表
*/
default Result<Tuple<Set<String>, List<T>>> getDataFromKafka(ConnectCluster connectCluster) { return Result.buildSuc(new Tuple<>(new HashSet<>(), new ArrayList<>())); }

/**
* 从Kafka中获取数据
* @param clusterPhy kafka集群
* @return 全部资源集合, 成功的资源列表
*/
default Result<Tuple<Set<String>, List<T>>> getDataFromKafka(ClusterPhy clusterPhy) { return Result.buildSuc(new Tuple<>(new HashSet<>(), new ArrayList<>())); }

/**
* 元信息同步至DB中
* @param clusterId 集群ID
* @param fullNameSet 全部资源列表
* @param dataList 成功的资源列表
*/
default void writeToDB(Long clusterId, Set<String> fullNameSet, List<T> dataList) {}

/**
* 依据kafka集群ID删除数据
* @param clusterPhyId kafka集群ID
*/
default int deleteInDBByKafkaClusterId(Long clusterPhyId) { return 0; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import com.xiaojukeji.know.streaming.km.common.constant.connect.KafkaConnectConstant;
import com.xiaojukeji.know.streaming.km.common.utils.CommonUtils;
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
import com.xiaojukeji.know.streaming.km.common.utils.Triple;
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;

import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -24,6 +26,9 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import static com.xiaojukeji.know.streaming.km.common.constant.connect.KafkaConnectConstant.MIRROR_MAKER_SOURCE_CLUSTER_BOOTSTRAP_SERVERS_FIELD_NAME;
import static com.xiaojukeji.know.streaming.km.common.constant.connect.KafkaConnectConstant.MIRROR_MAKER_TARGET_CLUSTER_BOOTSTRAP_SERVERS_FIELD_NAME;

public class ConnectConverter {
public static ConnectorBasicCombineExistVO convert2BasicVO(ConnectCluster connectCluster, ConnectorPO connectorPO) {
ConnectorBasicCombineExistVO vo = new ConnectorBasicCombineExistVO();
Expand Down Expand Up @@ -153,6 +158,66 @@ public static KSConnector convert2KSConnector(Long kafkaClusterPhyId, Long conne
return ksConnector;
}

public static List<KSConnector> convertAndSupplyMirrorMakerInfo(ConnectCluster connectCluster, List<Triple<KSConnectorInfo, List<String>, KSConnectorStateInfo>> connectorFullInfoList) {
// <connectorName, targetBootstrapServers + "@" + sourceBootstrapServers>
Map<String, String> sourceMap = new HashMap<>();

// <targetBootstrapServers + "@" + sourceBootstrapServers, connectorName>
Map<String, String> heartbeatMap = new HashMap<>();
Map<String, String> checkpointMap = new HashMap<>();

// 获取每个类型的connector的map信息
connectorFullInfoList.forEach(connector -> {
Map<String, String> mm2Map = null;
if (KafkaConnectConstant.MIRROR_MAKER_SOURCE_CONNECTOR_TYPE.equals(connector.v1().getConfig().get(KafkaConnectConstant.CONNECTOR_CLASS_FILED_NAME))) {
mm2Map = sourceMap;
} else if (KafkaConnectConstant.MIRROR_MAKER_HEARTBEAT_CONNECTOR_TYPE.equals(connector.v1().getConfig().get(KafkaConnectConstant.CONNECTOR_CLASS_FILED_NAME))) {
mm2Map = heartbeatMap;
} else if (KafkaConnectConstant.MIRROR_MAKER_CHECKPOINT_CONNECTOR_TYPE.equals(connector.v1().getConfig().get(KafkaConnectConstant.CONNECTOR_CLASS_FILED_NAME))) {
mm2Map = checkpointMap;
}

String targetBootstrapServers = connector.v1().getConfig().get(MIRROR_MAKER_TARGET_CLUSTER_BOOTSTRAP_SERVERS_FIELD_NAME);
String sourceBootstrapServers = connector.v1().getConfig().get(MIRROR_MAKER_SOURCE_CLUSTER_BOOTSTRAP_SERVERS_FIELD_NAME);

if (ValidateUtils.anyBlank(targetBootstrapServers, sourceBootstrapServers) || mm2Map == null) {
return;
}

if (KafkaConnectConstant.MIRROR_MAKER_SOURCE_CONNECTOR_TYPE.equals(connector.v1().getConfig().get(KafkaConnectConstant.CONNECTOR_CLASS_FILED_NAME))) {
// source 类型的格式和 heartbeat & checkpoint 的不一样
mm2Map.put(connector.v1().getName(), targetBootstrapServers + "@" + sourceBootstrapServers);
} else {
mm2Map.put(targetBootstrapServers + "@" + sourceBootstrapServers, connector.v1().getName());
}
});


List<KSConnector> connectorList = new ArrayList<>();
connectorFullInfoList.forEach(connector -> {
// 转化并添加到list中
KSConnector ksConnector = ConnectConverter.convert2KSConnector(
connectCluster.getKafkaClusterPhyId(),
connectCluster.getId(),
connector.v1(),
connector.v3(),
connector.v2()
);
connectorList.add(ksConnector);

// 补充mm2信息
String targetAndSource = sourceMap.get(ksConnector.getConnectorName());
if (ValidateUtils.isBlank(targetAndSource)) {
return;
}

ksConnector.setHeartbeatConnectorName(heartbeatMap.getOrDefault(targetAndSource, ""));
ksConnector.setCheckpointConnectorName(checkpointMap.getOrDefault(targetAndSource, ""));
});

return connectorList;
}

private static String genConnectorKey(Long connectorId, String connectorName){
return connectorId + "#" + connectorName;
}
Expand Down
Loading