Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize: automatic deletion of namingserver vgroup through Caffeine map #6770

Merged
merged 28 commits into from
Aug 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
50c77dc
add signature
ggbocoder Jan 2, 2024
b36f5b2
Merge remote-tracking branch 'upstream/2.x' into 2.x
ggbocoder Jan 20, 2024
e15537d
Merge remote-tracking branch 'upstream/2.x' into 2.x
ggbocoder Jan 25, 2024
85398ec
Merge remote-tracking branch 'upstream/2.x' into 2.x
ggbocoder Jan 31, 2024
e049125
Merge remote-tracking branch 'upstream/2.x' into 2.x
ggbocoder Mar 7, 2024
2939f59
Merge remote-tracking branch 'upstream/2.x' into 2.x
ggbocoder May 12, 2024
ebeda3a
Merge remote-tracking branch 'upstream/2.x' into 2.x
ggbocoder Jul 5, 2024
f2f9509
Merge branch '2.x' of /~https://github.com/seata/seata into 2.x
ggbocoder Jul 7, 2024
eed8785
Merge branch '2.x' of /~https://github.com/seata/seata into 2.x
ggbocoder Jul 15, 2024
5147f84
Merge branch '2.x' of /~https://github.com/seata/seata into 2.x
ggbocoder Jul 28, 2024
f6f4293
Merge branch '2.x' of /~https://github.com/seata/seata into 2.x
ggbocoder Aug 3, 2024
858aabd
Merge branch '2.x' of /~https://github.com/seata/seata into 2.x
ggbocoder Aug 14, 2024
f90f421
Merge branch '2.x' of /~https://github.com/seata/seata into 2.x
ggbocoder Aug 17, 2024
cf97436
fix
ggbocoder Aug 19, 2024
888aaab
Merge remote-tracking branch 'upstream/2.x' into optimize/optimize_na…
ggbocoder Aug 21, 2024
5368f94
fix
ggbocoder Aug 21, 2024
fcdb316
fix
ggbocoder Aug 21, 2024
a5d156b
fix code style
ggbocoder Aug 23, 2024
5cf5d78
fix
ggbocoder Aug 23, 2024
8b13293
fix UT
ggbocoder Aug 23, 2024
dcba549
fix caffeine version
ggbocoder Aug 23, 2024
af275d9
fix
ggbocoder Aug 23, 2024
68a90f5
fix
ggbocoder Aug 23, 2024
f180ed0
fix
ggbocoder Aug 23, 2024
e0402bf
Merge remote-tracking branch 'upstream/2.x' into optimize/optimize_na…
ggbocoder Aug 23, 2024
cca510e
fix
ggbocoder Aug 23, 2024
a81ac44
fix
ggbocoder Aug 23, 2024
65152ed
fix
ggbocoder Aug 23, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion changes/en-us/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ Add changes here for all PR submitted to the 2.x branch.
- [[#6763](/~https://github.com/apache/incubator-seata/pull/6763)] optimize NacosConfiguration singleton reload
- [[#6761](/~https://github.com/apache/incubator-seata/pull/6761)] optimize the namingserver code to improve readability
- [[#6768](/~https://github.com/apache/incubator-seata/pull/6768)] report the tcc fence transaction isolation level

- [[#6770](/~https://github.com/apache/incubator-seata/pull/6770)] Automatic deletion of namingserver vgroup through Caffeine map

### refactor:

Expand Down
1 change: 1 addition & 0 deletions changes/zh-cn/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
- [[#6763](/~https://github.com/apache/incubator-seata/pull/6763)] 优化 NacosConfiguration 单例加载
- [[#6761](/~https://github.com/apache/incubator-seata/pull/6761)] 提升namingserver manager代码可读性
- [[#6768](/~https://github.com/apache/incubator-seata/pull/6768)] 上报tcc fence事务隔离级别
- [[#6770](/~https://github.com/apache/incubator-seata/pull/6770)] 通过caffeine map支持namingserver事务分组的过期删除


### refactor:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,15 @@ public class NamingServerNode extends Node {
private double weight = 1.0;
private boolean healthy = true;
private long term;
private String unit;

public String getUnit() {
return unit;
}

public void setUnit(String unit) {
this.unit = unit;
}

public double getWeight() {
return weight;
Expand Down
6 changes: 6 additions & 0 deletions dependencies/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
<druid.version>1.2.20</druid.version>
<commons-dbcp2.version>2.9.0</commons-dbcp2.version>
<hikari.version>3.4.3</hikari.version>
<caffeine.version>2.8.8</caffeine.version>
<!-- sql parser -->
<antlr4.version>4.8</antlr4.version>
<!-- for jdbc driver when package -->
Expand Down Expand Up @@ -395,6 +396,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>${caffeine.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.spring</groupId>
<artifactId>spring-context-support</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
import org.slf4j.LoggerFactory;



public class NamingserverRegistryServiceImpl implements RegistryService<NamingListener> {
private static final Logger LOGGER = LoggerFactory.getLogger(NamingserverRegistryServiceImpl.class);

Expand Down Expand Up @@ -163,15 +162,6 @@ public void register(InetSocketAddress address) throws Exception {
heartBeatScheduledFuture.cancel(false);
}

heartBeatScheduledFuture = this.executorService.scheduleAtFixedRate(() -> {
try {
instance.setTimestamp(System.currentTimeMillis());
doRegister(instance, getNamingAddrs());
} catch (Exception e) {
LOGGER.error("Naming server register Exception", e);
}
}, HEARTBEAT_PERIOD, HEARTBEAT_PERIOD, TimeUnit.MILLISECONDS);

}

public void doRegister(Instance instance, List<String> urlList) {
Expand Down Expand Up @@ -232,7 +222,7 @@ public void unregister(InetSocketAddress address) {
String unit = instance.getUnit();
String jsonBody = instance.toJsonString();
String params = "unit=" + unit;
params = params + "&cluster=" + instance.getClusterName();
params = params + "&clusterName=" + instance.getClusterName();
params = params + "&namespace=" + instance.getNamespace();
url += params;
Map<String, String> header = new HashMap<>();
Expand Down
5 changes: 5 additions & 0 deletions namingserver/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@
<artifactId>spring-boot-starter</artifactId>
</dependency>

<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.seata.common.metadata.Cluster;
import org.apache.seata.common.util.StringUtils;
import org.apache.seata.namingserver.entity.pojo.ClusterData;

public class NamespaceBO {
Expand Down Expand Up @@ -54,4 +55,11 @@ public ClusterBO getCluster(String clusterName) {
return clusterMap.computeIfAbsent(clusterName, k -> new ClusterBO());
}

public void removeOldCluster(String clusterName) {
clusterMap.keySet().forEach(currentClusterName -> {
if (!StringUtils.equals(currentClusterName, clusterName)) {
clusterMap.remove(currentClusterName);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.PostConstruct;

import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.RemovalCause;
import com.github.benmanes.caffeine.cache.RemovalListener;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.entity.ContentType;
import org.apache.http.protocol.HTTP;
Expand All @@ -49,6 +53,8 @@
import org.apache.seata.namingserver.listener.ClusterChangeEvent;
import org.apache.seata.namingserver.entity.pojo.ClusterData;
import org.apache.seata.namingserver.entity.vo.monitor.ClusterVO;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -65,7 +71,7 @@
public class NamingManager {
private static final Logger LOGGER = LoggerFactory.getLogger(NamingManager.class);
private final ConcurrentMap<InetSocketAddress, Long> instanceLiveTable;
private final ConcurrentMap<String/* VGroup */, ConcurrentMap<String/* namespace */, NamespaceBO>> vGroupMap;
private volatile LoadingCache<String/* VGroup */, ConcurrentMap<String/* namespace */,NamespaceBO>> vGroupMap;
private final ConcurrentMap<String/* namespace */,
ConcurrentMap<String/* clusterName */, ClusterData>> namespaceClusterDataMap;

Expand All @@ -83,12 +89,24 @@ public class NamingManager {

public NamingManager() {
this.instanceLiveTable = new ConcurrentHashMap<>();
this.vGroupMap = new ConcurrentHashMap<>();
this.namespaceClusterDataMap = new ConcurrentHashMap<>();
}

@PostConstruct
public void init() {
this.vGroupMap = Caffeine.newBuilder()
.expireAfterAccess(heartbeatTimeThreshold, TimeUnit.MILLISECONDS) // expired time
.maximumSize(Integer.MAX_VALUE)
.removalListener(new RemovalListener<Object, Object>() {

@Override
public void onRemoval(@Nullable Object key, @Nullable Object value, @NonNull RemovalCause cause) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("vgroup map expired,vgroup:{},namespace:{}", key, value);
}
}
})
.build(k -> new ConcurrentHashMap<>());
this.heartBeatCheckService.scheduleAtFixedRate(() -> {
try {
instanceHeartBeatCheck();
Expand All @@ -112,7 +130,7 @@ public List<ClusterVO> monitorCluster(String namespace) {
LOGGER.warn("no cluster in namespace:" + namespace);
}

vGroupMap.forEach((vGroup, namespaceMap) -> {
vGroupMap.asMap().forEach((vGroup, namespaceMap) -> {
NamespaceBO namespaceBO = namespaceMap.get(namespace);
if (namespaceBO != null) {
namespaceBO.getClusterMap().forEach((clusterName, clusterBO) -> {
Expand Down Expand Up @@ -161,6 +179,9 @@ public Result<String> createGroup(String namespace, String vGroup, String cluste
public Result<String> removeGroup(String namespace, String clusterName,String vGroup, String unitName) {
List<Cluster> clusterList = getClusterListByVgroup(vGroup, namespace);
for (Cluster cluster : clusterList) {
if (!StringUtils.equals(clusterName, cluster.getClusterName())) {
continue;
}
if (cluster.getUnitData() != null && cluster.getUnitData().size() > 0) {
Unit unit = cluster.getUnitData().get(0);
if (unit != null && unit.getNamingInstanceList() != null && unit.getNamingInstanceList().size() > 0) {
Expand Down Expand Up @@ -194,19 +215,21 @@ public Result<String> removeGroup(String namespace, String clusterName,String vG

public void addGroup(String namespace, String clusterName, String unitName, String vGroup) {
try {
ClusterBO clusterBO = vGroupMap.computeIfAbsent(vGroup, k -> new ConcurrentHashMap<>())
ClusterBO clusterBO = vGroupMap.get(vGroup, k -> new ConcurrentHashMap<>())
.computeIfAbsent(namespace, k -> new NamespaceBO()).getCluster(clusterName);
if (clusterBO != null && !clusterBO.getUnitNames().contains(unitName)) {
clusterBO.addUnit(unitName);
NamespaceBO namespaceBO = vGroupMap.getIfPresent(vGroup).get(namespace);
namespaceBO.removeOldCluster(clusterName);
applicationContext.publishEvent(new ClusterChangeEvent(this, vGroup, System.currentTimeMillis()));
}
} catch (Exception e) {
LOGGER.error("change vGroup mapping failed:{}", vGroup);
LOGGER.error("change vGroup mapping failed:{}", vGroup, e);
}
}

public void notifyClusterChange(String namespace, String clusterName, String unitName, long term) {
vGroupMap.forEach((vGroup, namespaceMap) -> {
vGroupMap.asMap().forEach((vGroup, namespaceMap) -> {
Optional.ofNullable(namespaceMap.get(namespace))
.flatMap(namespaceBO -> Optional.ofNullable(namespaceBO.getCluster(clusterName)))
.ifPresent(clusterBO -> {
Expand Down Expand Up @@ -239,7 +262,6 @@ public boolean registerInstance(NamingServerNode node, String namespace, String
});
}
});

boolean hasChanged = clusterData.registerInstance(node, unitName);
if (hasChanged) {
notifyClusterChange(namespace, clusterName, unitName,node.getTerm());
Expand All @@ -263,25 +285,25 @@ public boolean unregisterInstance(String namespace, String clusterName, String u
clusterData.removeInstance(node, unitName);
Object vgroupMap = node.getMetadata().get(CONSTANT_GROUP);
if (vgroupMap instanceof Map) {
((Map<String, Object>)vgroupMap).forEach((group, realUnitName) -> vGroupMap.get(group)
.get(namespace).getCluster(clusterName).remove((String) realUnitName));
((Map<String, Object>) vgroupMap).forEach((group, realUnitName) -> vGroupMap.get(group, k -> new ConcurrentHashMap<>())
.get(namespace).getCluster(clusterName).remove(realUnitName == null ? unitName : (String) realUnitName));
}
notifyClusterChange(namespace, clusterName, unitName, node.getTerm());
instanceLiveTable.remove(
new InetSocketAddress(node.getTransaction().getHost(), node.getTransaction().getPort()));
}
}
} catch (Exception e) {
LOGGER.error("Instance unregistered failed!");
LOGGER.error("Instance unregistered failed!", e);
return false;
}
return true;
}

public List<Cluster> getClusterListByVgroup(String vGroup, String namespace) {
// find the cluster where the transaction group is located
ConcurrentMap<String/* namespace */, NamespaceBO> vgroupNamespaceMap =
vGroupMap.get(vGroup);
HashMap<String/* VGroup */, ConcurrentMap<String/* namespace */,NamespaceBO>> concurrentVgroupMap = new HashMap<>(vGroupMap.asMap());
ConcurrentMap<String/* namespace */, NamespaceBO> vgroupNamespaceMap = concurrentVgroupMap.get(vGroup);
List<Cluster> clusterList = new ArrayList<>();
if (!CollectionUtils.isEmpty(vgroupNamespaceMap)) {
NamespaceBO namespaceBO = vgroupNamespaceMap.get(namespace);
Expand Down Expand Up @@ -325,9 +347,13 @@ public void instanceHeartBeatCheck() {
if (vgoupMap instanceof Map) {
((Map<String, Object>)vgoupMap).forEach((group, unitName) -> {
ClusterBO clusterBO =
vGroupMap.get(group).get(namespace).getCluster(clusterData.getClusterName());
vGroupMap.get(group)
.computeIfAbsent(namespace, k -> new NamespaceBO())
.getCluster(clusterData.getClusterName());
Set<String> units = clusterBO.getUnitNames();
units.remove((String)unitName);
if (units != null) {
units.remove(unitName == null ? instance.getUnit() : unitName);
}
});
}

Expand All @@ -352,10 +378,10 @@ public Result<String> changeGroup(String namespace, String vGroup, String cluste
if (StringUtils.equalsIgnoreCase(clusterName, currentCluster)) {
continue;
}
result.set(removeGroup(currentNamespace, clusterName, vGroup, unitName));
result.set(removeGroup(currentNamespace, currentCluster, vGroup, unitName));
} else {
if (!StringUtils.equalsIgnoreCase(unitName, currentUnitName)) {
result.set(removeGroup(currentNamespace, clusterName, vGroup, unitName));
result.set(removeGroup(currentNamespace, currentCluster, vGroup, unitName));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,41 +263,4 @@ void mockHeartbeat() throws InterruptedException {
assertEquals(8091, node1.getTransaction().getPort());
}

@Test
void mockIntermediateState() {
String clusterName = "cluster1";
String namespace = "public6";
String vGroup = "vgroup1";
String unitName = String.valueOf(UUID.randomUUID());
NamingServerNode node = new NamingServerNode();
node.setTransaction(new Node.Endpoint("127.0.0.1", 8091, "netty"));
node.setControl(new Node.Endpoint("127.0.0.1", 7091, "http"));
Map<String, Object> meatadata = node.getMetadata();
Map<String,Object> vGroups = new HashMap<>();
vGroups.put(vGroup,unitName);
meatadata.put(CONSTANT_GROUP, vGroups);
namingController.registerInstance(namespace, clusterName, unitName, node);
NamingServerNode node2 = new NamingServerNode();
node2.setTransaction(new Node.Endpoint("127.0.0.1", 8092, "netty"));
node2.setControl(new Node.Endpoint("127.0.0.1", 7092, "http"));
Map<String, Object> meatadata2 = node2.getMetadata();
Map<String,Object> vGroups2 = new HashMap<>();
String unitName2 = UUID.randomUUID().toString();
vGroups2.put(vGroup,unitName2);
meatadata2.put(CONSTANT_GROUP, vGroups2);
namingController.registerInstance(namespace, "clusterName2", unitName2, node2);
MetaResponse metaResponse = namingController.discovery(vGroup, namespace);
assertNotNull(metaResponse);
assertNotNull(metaResponse.getClusterList());
assertEquals(2, metaResponse.getClusterList().size());
Cluster cluster = metaResponse.getClusterList().get(1);
assertNotNull(cluster.getUnitData());
assertEquals(1, cluster.getUnitData().size());
Unit unit = cluster.getUnitData().get(0);
assertNotNull(unit.getNamingInstanceList());
assertEquals(1, unit.getNamingInstanceList().size());
Node unit2 = unit.getNamingInstanceList().get(0);
assertEquals(unit2.getTransaction(), node2.getTransaction());
}

}
Loading
Loading