Skip to content

Commit

Permalink
Fix RegionAwareEnsemblePlacementPolicy.newEnsemble sometimes failed p…
Browse files Browse the repository at this point in the history
…roblem. (#3725)

Descriptions of the changes in this PR:
Fixes #3722

### Motivation
See [#3722](#3722 (comment))
  • Loading branch information
horizonzy authored Feb 17, 2023
1 parent 7f8c31b commit 2381d9b
Show file tree
Hide file tree
Showing 5 changed files with 194 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -430,13 +430,14 @@ protected PlacementResult<List<BookieId>> newEnsembleInternal(
curRack = curRack + NetworkTopologyImpl.NODE_SEPARATOR + prevNode.getNetworkLocation();
}
}
boolean firstBookieInTheEnsemble = (null == prevNode);
try {
prevNode = selectRandomFromRack(curRack, excludeNodes, ensemble, ensemble);
} catch (BKNotEnoughBookiesException e) {
if (!curRack.equals(NodeBase.ROOT)) {
curRack = NodeBase.ROOT;
prevNode = selectFromNetworkLocation(curRack, excludeNodes, ensemble, ensemble,
!enforceMinNumRacksPerWriteQuorum || prevNode == null);
!enforceMinNumRacksPerWriteQuorum || firstBookieInTheEnsemble);
} else {
throw e;
}
Expand Down Expand Up @@ -1185,7 +1186,7 @@ private PlacementResult<List<BookieId>> doReplaceToAdherePlacementPolicy(
curRack = localNode.getNetworkLocation();
}
} else {
curRack = "~" + prevNode.getNetworkLocation();
curRack = NetworkTopologyImpl.INVERSE + prevNode.getNetworkLocation();
}
try {
prevNode = replaceToAdherePlacementPolicyInternal(
Expand Down Expand Up @@ -1252,7 +1253,7 @@ private BookieNode replaceToAdherePlacementPolicyInternal(
// avoid additional replace from write quorum candidates by preExcludeRacks and postExcludeRacks
// avoid to use first candidate bookies for election by provisionalEnsembleNodes
conditionList.add(Pair.of(
"~" + String.join(",",
NetworkTopologyImpl.INVERSE + String.join(",",
Stream.concat(preExcludeRacks.stream(), postExcludeRacks.stream()).collect(Collectors.toSet())),
provisionalEnsembleNodes
));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,7 @@ private String getExcludedZonesString(Set<String> excludeZones) {
if (excludeZones.isEmpty()) {
return "";
}
StringBuilder excludedZonesString = new StringBuilder("~");
StringBuilder excludedZonesString = new StringBuilder(NetworkTopologyImpl.INVERSE);
boolean firstZone = true;
for (String excludeZone : excludeZones) {
if (!firstZone) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class NetworkTopologyImpl implements NetworkTopology {
public static final int DEFAULT_HOST_LEVEL = 2;
public static final Logger LOG = LoggerFactory.getLogger(NetworkTopologyImpl.class);
public static final String NODE_SEPARATOR = ",";
public static final String INVERSE = "~";

/**
* A marker for an InvalidTopology Exception.
Expand Down Expand Up @@ -708,7 +709,7 @@ protected boolean isSameParents(Node node1, Node node2) {
public Node chooseRandom(String scope) {
netlock.readLock().lock();
try {
if (scope.startsWith("~")) {
if (scope.startsWith(INVERSE)) {
return chooseRandom(NodeBase.ROOT, scope.substring(1));
} else {
return chooseRandom(scope, null);
Expand Down Expand Up @@ -774,7 +775,7 @@ private Set<Node> doGetLeaves(String scope) {
public Set<Node> getLeaves(String scope) {
netlock.readLock().lock();
try {
if (scope.startsWith("~")) {
if (scope.startsWith(INVERSE)) {
Set<Node> allNodes = doGetLeaves(NodeBase.ROOT);
String[] excludeScopes = scope.substring(1).split(NODE_SEPARATOR);
Set<Node> excludeNodes = new HashSet<Node>();
Expand All @@ -794,7 +795,7 @@ public Set<Node> getLeaves(String scope) {
@Override
public int countNumOfAvailableNodes(String scope, Collection<Node> excludedNodes) {
boolean isExcluded = false;
if (scope.startsWith("~")) {
if (scope.startsWith(INVERSE)) {
isExcluded = true;
scope = scope.substring(1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1518,6 +1518,130 @@ public void testNewEnsembleWithMultipleRacks() throws Exception {
}
}

//see: /~https://github.com/apache/bookkeeper/issues/3722
@Test
public void testNewEnsembleWithMultipleRacksWithCommonRack() throws Exception {
ClientConfiguration clientConf = new ClientConfiguration(conf);
clientConf.setEnforceMinNumRacksPerWriteQuorum(true);
clientConf.setMinNumRacksPerWriteQuorum(3);
repp.uninitalize();
repp = new RackawareEnsemblePlacementPolicy();
repp.initialize(clientConf, Optional.<DNSToSwitchMapping>empty(), timer,
DISABLE_ALL, NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);

BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181);
BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.2", 3181);
BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.3", 3181);
BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.4", 3181);
BookieSocketAddress addr5 = new BookieSocketAddress("127.0.0.5", 3181);
BookieSocketAddress addr6 = new BookieSocketAddress("127.0.0.6", 3181);
BookieSocketAddress addr7 = new BookieSocketAddress("127.0.0.7", 3181);
BookieSocketAddress addr8 = new BookieSocketAddress("127.0.0.8", 3181);
BookieSocketAddress addr9 = new BookieSocketAddress("127.0.0.9", 3181);
BookieSocketAddress addr10 = new BookieSocketAddress("127.0.0.10", 3181);
// update dns mapping
StaticDNSResolver.addNodeToRack(addr1.getHostName(), "/default-region/r1");
StaticDNSResolver.addNodeToRack(addr2.getHostName(), "/default-region/r1");
StaticDNSResolver.addNodeToRack(addr3.getHostName(), "/default-region/r1");
StaticDNSResolver.addNodeToRack(addr4.getHostName(), "/default-region/r1");
StaticDNSResolver.addNodeToRack(addr5.getHostName(), "/default-region/r1");
StaticDNSResolver.addNodeToRack(addr6.getHostName(), "/default-region/r1");
StaticDNSResolver.addNodeToRack(addr7.getHostName(), "/default-region/r1");
StaticDNSResolver.addNodeToRack(addr8.getHostName(), "/default-region/r1");
StaticDNSResolver.addNodeToRack(addr9.getHostName(), "/default-region/r2");
StaticDNSResolver.addNodeToRack(addr10.getHostName(), "/default-region/r3");
// Update cluster
Set<BookieId> addrs = new HashSet<BookieId>();
addrs.add(addr1.toBookieId());
addrs.add(addr2.toBookieId());
addrs.add(addr3.toBookieId());
addrs.add(addr4.toBookieId());
addrs.add(addr5.toBookieId());
addrs.add(addr6.toBookieId());
addrs.add(addr7.toBookieId());
addrs.add(addr8.toBookieId());
addrs.add(addr9.toBookieId());
addrs.add(addr10.toBookieId());
repp.onClusterChanged(addrs, new HashSet<BookieId>());

try {
int ensembleSize = 10;
int writeQuorumSize = 10;
int ackQuorumSize = 2;

for (int i = 0; i < 50; ++i) {
Set<BookieId> excludeBookies = new HashSet<>();
EnsemblePlacementPolicy.PlacementResult<List<BookieId>> ensembleResponse =
repp.newEnsemble(ensembleSize, writeQuorumSize,
ackQuorumSize, null, excludeBookies);
}
} catch (Exception e) {
fail("Can not new ensemble selection succeed");
}
}

@Test
public void testNewEnsembleWithMultipleRacksWithCommonRackFailed() throws Exception {
ClientConfiguration clientConf = new ClientConfiguration(conf);
clientConf.setEnforceMinNumRacksPerWriteQuorum(true);
clientConf.setMinNumRacksPerWriteQuorum(3);
repp.uninitalize();
repp = new RackawareEnsemblePlacementPolicy();
repp.initialize(clientConf, Optional.<DNSToSwitchMapping>empty(), timer,
DISABLE_ALL, NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);

BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181);
BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.2", 3181);
BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.3", 3181);
BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.4", 3181);
BookieSocketAddress addr5 = new BookieSocketAddress("127.0.0.5", 3181);
BookieSocketAddress addr6 = new BookieSocketAddress("127.0.0.6", 3181);
BookieSocketAddress addr7 = new BookieSocketAddress("127.0.0.7", 3181);
BookieSocketAddress addr8 = new BookieSocketAddress("127.0.0.8", 3181);
BookieSocketAddress addr9 = new BookieSocketAddress("127.0.0.9", 3181);
BookieSocketAddress addr10 = new BookieSocketAddress("127.0.0.10", 3181);
// update dns mapping
StaticDNSResolver.addNodeToRack(addr1.getHostName(), "/default-region/r1");
StaticDNSResolver.addNodeToRack(addr2.getHostName(), "/default-region/r1");
StaticDNSResolver.addNodeToRack(addr3.getHostName(), "/default-region/r1");
StaticDNSResolver.addNodeToRack(addr4.getHostName(), "/default-region/r1");
StaticDNSResolver.addNodeToRack(addr5.getHostName(), "/default-region/r1");
StaticDNSResolver.addNodeToRack(addr6.getHostName(), "/default-region/r1");
StaticDNSResolver.addNodeToRack(addr7.getHostName(), "/default-region/r1");
StaticDNSResolver.addNodeToRack(addr8.getHostName(), "/default-region/r1");
StaticDNSResolver.addNodeToRack(addr9.getHostName(), "/default-region/r1");
StaticDNSResolver.addNodeToRack(addr10.getHostName(), "/default-region/r2");
// Update cluster
Set<BookieId> addrs = new HashSet<BookieId>();
addrs.add(addr1.toBookieId());
addrs.add(addr2.toBookieId());
addrs.add(addr3.toBookieId());
addrs.add(addr4.toBookieId());
addrs.add(addr5.toBookieId());
addrs.add(addr6.toBookieId());
addrs.add(addr7.toBookieId());
addrs.add(addr8.toBookieId());
addrs.add(addr9.toBookieId());
addrs.add(addr10.toBookieId());
repp.onClusterChanged(addrs, new HashSet<BookieId>());

try {
int ensembleSize = 10;
int writeQuorumSize = 10;
int ackQuorumSize = 2;

Set<BookieId> excludeBookies = new HashSet<>();
EnsemblePlacementPolicy.PlacementResult<List<BookieId>> ensembleResponse =
repp.newEnsemble(ensembleSize, writeQuorumSize,
ackQuorumSize, null, excludeBookies);
fail("Can not new ensemble selection succeed");
} catch (Exception e) {
assertTrue(e instanceof BKNotEnoughBookiesException);
}
}

@Test
public void testNewEnsembleWithPickDifferentRack() throws Exception {
BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,67 @@ public void testNewEnsembleWithEnoughRegions() throws Exception {
}
}

@Test
public void testNewEnsembleWithMultipleRacksWithCommonRack() throws Exception {
ClientConfiguration clientConf = new ClientConfiguration(conf);
clientConf.setEnforceMinNumRacksPerWriteQuorum(true);
clientConf.setMinNumRacksPerWriteQuorum(3);
repp.uninitalize();
repp = new RegionAwareEnsemblePlacementPolicy();
repp.initialize(clientConf, Optional.<DNSToSwitchMapping>empty(), timer,
DISABLE_ALL, NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);

BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181);
BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.2", 3181);
BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.3", 3181);
BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.4", 3181);
BookieSocketAddress addr5 = new BookieSocketAddress("127.0.0.5", 3181);
BookieSocketAddress addr6 = new BookieSocketAddress("127.0.0.6", 3181);
BookieSocketAddress addr7 = new BookieSocketAddress("127.0.0.7", 3181);
BookieSocketAddress addr8 = new BookieSocketAddress("127.0.0.8", 3181);
BookieSocketAddress addr9 = new BookieSocketAddress("127.0.0.9", 3181);
BookieSocketAddress addr10 = new BookieSocketAddress("127.0.0.10", 3181);
// update dns mapping
StaticDNSResolver.addNodeToRack(addr1.getHostName(), "/region1/r1");
StaticDNSResolver.addNodeToRack(addr2.getHostName(), "/region1/r1");
StaticDNSResolver.addNodeToRack(addr3.getHostName(), "/region1/r1");
StaticDNSResolver.addNodeToRack(addr4.getHostName(), "/region1/r1");
StaticDNSResolver.addNodeToRack(addr5.getHostName(), "/region1/r1");
StaticDNSResolver.addNodeToRack(addr6.getHostName(), "/region1/r1");
StaticDNSResolver.addNodeToRack(addr7.getHostName(), "/region1/r2");
StaticDNSResolver.addNodeToRack(addr8.getHostName(), "/region1/r3");
StaticDNSResolver.addNodeToRack(addr9.getHostName(), "/region2/r1");
StaticDNSResolver.addNodeToRack(addr10.getHostName(), "/region3/r1");
// Update cluster
Set<BookieId> addrs = new HashSet<BookieId>();
addrs.add(addr1.toBookieId());
addrs.add(addr2.toBookieId());
addrs.add(addr3.toBookieId());
addrs.add(addr4.toBookieId());
addrs.add(addr5.toBookieId());
addrs.add(addr6.toBookieId());
addrs.add(addr7.toBookieId());
addrs.add(addr8.toBookieId());
addrs.add(addr9.toBookieId());
addrs.add(addr10.toBookieId());
repp.onClusterChanged(addrs, new HashSet<BookieId>());

try {
int ensembleSize = 10;
int writeQuorumSize = 10;
int ackQuorumSize = 2;

for (int i = 0; i < 50; ++i) {
Set<BookieId> excludeBookies = new HashSet<>();
EnsemblePlacementPolicy.PlacementResult<List<BookieId>> ensembleResponse =
repp.newEnsemble(ensembleSize, writeQuorumSize,
ackQuorumSize, null, excludeBookies);
}
} catch (Exception e) {
fail("RegionAwareEnsemblePlacementPolicy should newEnsemble succeed.");
}
}

@Test
public void testNewEnsembleWithThreeRegions() throws Exception {
repp.uninitalize();
Expand Down

0 comments on commit 2381d9b

Please sign in to comment.