Skip to content

Commit

Permalink
Optimize ShardingTableBroadcastRoutingEngine and SingleTableSQLRouter…
Browse files Browse the repository at this point in the history
… logic for PostgreSQL schema (#17155)
  • Loading branch information
strongduanmu authored Apr 28, 2022
1 parent 05af4af commit b01274c
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ private static ShardingRouteEngine getDDLRoutingEngine(final ShardingRule shardi
if (!tableNames.isEmpty() && shardingRuleTableNames.isEmpty()) {
return new ShardingIgnoreRoutingEngine();
}
return new ShardingTableBroadcastRoutingEngine(metaData.getDefaultSchema(), sqlStatementContext, shardingRuleTableNames);
return new ShardingTableBroadcastRoutingEngine(metaData, sqlStatementContext, shardingRuleTableNames);
}

private static ShardingRouteEngine getDALRoutingEngine(final ShardingRule shardingRule, final ShardingSphereMetaData metaData, final SQLStatementContext<?> sqlStatementContext) {
Expand All @@ -140,11 +140,11 @@ private static ShardingRouteEngine getDALRoutingEngine(final ShardingRule shardi
return new ShardingIgnoreRoutingEngine();
}
if (sqlStatement instanceof MySQLOptimizeTableStatement) {
return new ShardingTableBroadcastRoutingEngine(metaData.getDefaultSchema(), sqlStatementContext, shardingRuleTableNames);
return new ShardingTableBroadcastRoutingEngine(metaData, sqlStatementContext, shardingRuleTableNames);
}
if (sqlStatement instanceof AnalyzeTableStatement) {
return shardingRuleTableNames.isEmpty() ? new ShardingDatabaseBroadcastRoutingEngine()
: new ShardingTableBroadcastRoutingEngine(metaData.getDefaultSchema(), sqlStatementContext, shardingRuleTableNames);
: new ShardingTableBroadcastRoutingEngine(metaData, sqlStatementContext, shardingRuleTableNames);
}
if (!shardingRuleTableNames.isEmpty()) {
return new ShardingUnicastRoutingEngine(shardingRuleTableNames);
Expand All @@ -161,8 +161,7 @@ private static ShardingRouteEngine getDCLRoutingEngine(final ShardingRule shardi
if (isDCLForSingleTable(sqlStatementContext)) {
Collection<String> shardingRuleTableNames = shardingRule.getShardingRuleTableNames(sqlStatementContext.getTablesContext().getTableNames());
return !shardingRuleTableNames.isEmpty()
? new ShardingTableBroadcastRoutingEngine(metaData.getDefaultSchema(), sqlStatementContext, shardingRuleTableNames)
: new ShardingIgnoreRoutingEngine();
? new ShardingTableBroadcastRoutingEngine(metaData, sqlStatementContext, shardingRuleTableNames) : new ShardingIgnoreRoutingEngine();
} else {
return new ShardingInstanceBroadcastRoutingEngine(metaData.getResource().getDataSourcesMetaData());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
import org.apache.shardingsphere.infra.binder.type.IndexAvailable;
import org.apache.shardingsphere.infra.datanode.DataNode;
import org.apache.shardingsphere.infra.metadata.schema.ShardingSphereSchema;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.metadata.schema.util.IndexMetaDataUtil;
import org.apache.shardingsphere.infra.route.context.RouteContext;
import org.apache.shardingsphere.infra.route.context.RouteMapper;
Expand All @@ -42,7 +42,7 @@
@RequiredArgsConstructor
public final class ShardingTableBroadcastRoutingEngine implements ShardingRouteEngine {

private final ShardingSphereSchema schema;
private final ShardingSphereMetaData metaData;

private final SQLStatementContext<?> sqlStatementContext;

Expand Down Expand Up @@ -105,7 +105,8 @@ private Collection<String> getLogicTableNames() {
if (!shardingRuleTableNames.isEmpty()) {
return shardingRuleTableNames;
}
return sqlStatementContext instanceof IndexAvailable ? IndexMetaDataUtil.getTableNamesFromMetaData(schema, ((IndexAvailable) sqlStatementContext).getIndexes()) : Collections.emptyList();
return sqlStatementContext instanceof IndexAvailable ? IndexMetaDataUtil.getTableNamesFromMetaData(metaData,
((IndexAvailable) sqlStatementContext).getIndexes(), sqlStatementContext.getDatabaseType()) : Collections.emptyList();
}

private Collection<RouteUnit> getBroadcastTableRouteUnits(final ShardingRule shardingRule, final String broadcastTableName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@
import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
import org.apache.shardingsphere.infra.binder.statement.ddl.DropIndexStatementContext;
import org.apache.shardingsphere.infra.binder.type.IndexAvailable;
import org.apache.shardingsphere.infra.database.DefaultSchema;
import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.metadata.resource.ShardingSphereResource;
import org.apache.shardingsphere.infra.metadata.rule.ShardingSphereRuleMetaData;
import org.apache.shardingsphere.infra.metadata.schema.ShardingSphereSchema;
import org.apache.shardingsphere.infra.route.context.RouteContext;
import org.apache.shardingsphere.infra.route.context.RouteMapper;
Expand All @@ -38,6 +43,8 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;

import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
Expand All @@ -53,7 +60,7 @@ public final class ShardingTableBroadcastRoutingEngineTest extends AbstractRouti
public void assertRouteForEmptyTable() {
Collection<String> tableNames = Collections.emptyList();
ShardingTableBroadcastRoutingEngine shardingTableBroadcastRoutingEngine =
new ShardingTableBroadcastRoutingEngine(mock(ShardingSphereSchema.class), createSQLStatementContext(tableNames), tableNames);
new ShardingTableBroadcastRoutingEngine(mock(ShardingSphereMetaData.class), createSQLStatementContext(tableNames), tableNames);
RouteContext routeContext = shardingTableBroadcastRoutingEngine.route(createShardingRule(false));
assertRouteUnitWithoutTables(routeContext);
}
Expand All @@ -62,7 +69,7 @@ public void assertRouteForEmptyTable() {
public void assertRouteForNormalTable() {
Collection<String> tableNames = Collections.singletonList("t_order");
ShardingTableBroadcastRoutingEngine shardingTableBroadcastRoutingEngine =
new ShardingTableBroadcastRoutingEngine(mock(ShardingSphereSchema.class), createSQLStatementContext(tableNames), tableNames);
new ShardingTableBroadcastRoutingEngine(mock(ShardingSphereMetaData.class), createSQLStatementContext(tableNames), tableNames);
RouteContext routeContext = shardingTableBroadcastRoutingEngine.route(createShardingRule(false));
assertThat(routeContext.getActualDataSourceNames().size(), is(2));
assertThat(routeContext.getRouteUnits().size(), is(4));
Expand All @@ -77,7 +84,7 @@ public void assertRouteForNormalTable() {
public void assertRouteForBroadcastTable() {
Collection<String> tableNames = Collections.singletonList("t_order");
ShardingTableBroadcastRoutingEngine shardingTableBroadcastRoutingEngine =
new ShardingTableBroadcastRoutingEngine(mock(ShardingSphereSchema.class), createSQLStatementContext(tableNames), tableNames);
new ShardingTableBroadcastRoutingEngine(mock(ShardingSphereMetaData.class), createSQLStatementContext(tableNames), tableNames);
RouteContext routeContext = shardingTableBroadcastRoutingEngine.route(createShardingRule(true));
assertThat(routeContext.getActualDataSourceNames().size(), is(2));
assertThat(routeContext.getRouteUnits().size(), is(2));
Expand All @@ -93,11 +100,15 @@ public void assertRouteForDropIndexStatement() {
when(schema.get(anyString()).getIndexes().containsKey(anyString())).thenReturn(true);
IndexSegment segment = mock(IndexSegment.class, RETURNS_DEEP_STUBS);
when(segment.getIndexName().getIdentifier().getValue()).thenReturn("t_order");
when(segment.getOwner()).thenReturn(Optional.empty());
SQLStatementContext<DropIndexStatement> sqlStatementContext = mock(DropIndexStatementContext.class, RETURNS_DEEP_STUBS);
Collection<String> tableNames = Collections.emptyList();
when(sqlStatementContext.getTablesContext().getTableNames()).thenReturn(tableNames);
when(sqlStatementContext.getDatabaseType()).thenReturn(new MySQLDatabaseType());
when(((IndexAvailable) sqlStatementContext).getIndexes()).thenReturn(Collections.singletonList(segment));
ShardingTableBroadcastRoutingEngine shardingTableBroadcastRoutingEngine = new ShardingTableBroadcastRoutingEngine(schema, sqlStatementContext, tableNames);
Map<String, ShardingSphereSchema> schemas = Collections.singletonMap(DefaultSchema.LOGIC_NAME, schema);
ShardingSphereMetaData metaData = new ShardingSphereMetaData(DefaultSchema.LOGIC_NAME, mock(ShardingSphereResource.class), mock(ShardingSphereRuleMetaData.class), schemas);
ShardingTableBroadcastRoutingEngine shardingTableBroadcastRoutingEngine = new ShardingTableBroadcastRoutingEngine(metaData, sqlStatementContext, tableNames);
RouteContext routeContext = shardingTableBroadcastRoutingEngine.route(createShardingRule(false));
assertThat(routeContext.getActualDataSourceNames().size(), is(2));
Iterator<RouteUnit> routeUnits = routeContext.getRouteUnits().iterator();
Expand All @@ -116,7 +127,9 @@ public void assertRouteForDropIndexStatementDoNotFoundTables() {
SQLStatementContext<DropIndexStatement> sqlStatementContext = mock(DropIndexStatementContext.class, RETURNS_DEEP_STUBS);
Collection<String> tableNames = Collections.emptyList();
when(sqlStatementContext.getTablesContext().getTableNames()).thenReturn(tableNames);
ShardingTableBroadcastRoutingEngine shardingTableBroadcastRoutingEngine = new ShardingTableBroadcastRoutingEngine(schema, sqlStatementContext, tableNames);
Map<String, ShardingSphereSchema> schemas = Collections.singletonMap(DefaultSchema.LOGIC_NAME, schema);
ShardingSphereMetaData metaData = new ShardingSphereMetaData(DefaultSchema.LOGIC_NAME, mock(ShardingSphereResource.class), mock(ShardingSphereRuleMetaData.class), schemas);
ShardingTableBroadcastRoutingEngine shardingTableBroadcastRoutingEngine = new ShardingTableBroadcastRoutingEngine(metaData, sqlStatementContext, tableNames);
RouteContext routeContext = shardingTableBroadcastRoutingEngine.route(createShardingRule(false));
assertRouteUnitWithoutTables(routeContext);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import com.google.common.base.Strings;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.metadata.schema.ShardingSphereSchema;
import org.apache.shardingsphere.sql.parser.sql.common.segment.ddl.index.IndexSegment;
import org.apache.shardingsphere.sql.parser.sql.common.segment.dml.column.ColumnSegment;
Expand Down Expand Up @@ -78,14 +80,17 @@ public static String getGeneratedLogicIndexName(final Collection<ColumnSegment>
/**
* Get table names from metadata.
*
* @param schema schema
* @param metaData meta data
* @param indexes indexes
* @param databaseType database type
* @return table names
*/
public static Collection<String> getTableNamesFromMetaData(final ShardingSphereSchema schema, final Collection<IndexSegment> indexes) {
public static Collection<String> getTableNamesFromMetaData(final ShardingSphereMetaData metaData, final Collection<IndexSegment> indexes, final DatabaseType databaseType) {
Collection<String> result = new LinkedList<>();
String schemaName = databaseType.getDefaultSchema(metaData.getDatabaseName());
for (IndexSegment each : indexes) {
findLogicTableNameFromMetaData(schema, each.getIndexName().getIdentifier().getValue()).ifPresent(result::add);
String actualSchemaName = each.getOwner().map(optional -> optional.getIdentifier().getValue()).orElse(schemaName);
findLogicTableNameFromMetaData(metaData.getSchemaByName(actualSchemaName), each.getIndexName().getIdentifier().getValue()).ifPresent(result::add);
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@
package org.apache.shardingsphere.infra.metadata.schema.util;

import com.google.common.collect.Lists;
import org.apache.shardingsphere.infra.database.DefaultSchema;
import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.metadata.resource.ShardingSphereResource;
import org.apache.shardingsphere.infra.metadata.rule.ShardingSphereRuleMetaData;
import org.apache.shardingsphere.infra.metadata.schema.ShardingSphereSchema;
import org.apache.shardingsphere.infra.metadata.schema.model.IndexMetaData;
import org.apache.shardingsphere.infra.metadata.schema.model.TableMetaData;
Expand All @@ -34,6 +39,7 @@

import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.mock;

public final class IndexMetaDataUtilTest {

Expand Down Expand Up @@ -76,13 +82,14 @@ public void assertGetGeneratedLogicIndexName() {
@Test
public void assertGetTableNamesFromMetaData() {
IndexSegment indexSegment = new IndexSegment(0, 0, new IndexNameSegment(0, 0, new IdentifierValue(INDEX_NAME)));
assertThat(IndexMetaDataUtil.getTableNamesFromMetaData(buildSchema(), Lists.newArrayList(indexSegment)), is(Collections.singletonList(TABLE_NAME)));
assertThat(IndexMetaDataUtil.getTableNamesFromMetaData(buildMetaData(), Lists.newArrayList(indexSegment), new MySQLDatabaseType()), is(Collections.singletonList(TABLE_NAME)));
}

private ShardingSphereSchema buildSchema() {
private ShardingSphereMetaData buildMetaData() {
TableMetaData tableMetaData = new TableMetaData(TABLE_NAME, Collections.emptyList(), Collections.singletonList(new IndexMetaData(INDEX_NAME)), Collections.emptyList());
Map<String, TableMetaData> tables = new HashMap<>(1, 1);
tables.put(TABLE_NAME, tableMetaData);
return new ShardingSphereSchema(tables);
Map<String, ShardingSphereSchema> schemas = Collections.singletonMap(DefaultSchema.LOGIC_NAME, new ShardingSphereSchema(tables));
return new ShardingSphereMetaData(DefaultSchema.LOGIC_NAME, mock(ShardingSphereResource.class), mock(ShardingSphereRuleMetaData.class), schemas);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,12 @@

package org.apache.shardingsphere.infra.context.refresher.type;

import com.google.common.collect.LinkedHashMultimap;
import com.google.common.collect.Multimap;
import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
import org.apache.shardingsphere.infra.context.refresher.MetaDataRefresher;
import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
import org.apache.shardingsphere.infra.federation.optimizer.context.planner.OptimizerPlannerContext;
import org.apache.shardingsphere.infra.federation.optimizer.metadata.FederationDatabaseMetaData;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.metadata.schema.ShardingSphereSchema;
import org.apache.shardingsphere.infra.metadata.schema.event.SchemaAlteredEvent;
import org.apache.shardingsphere.infra.metadata.schema.model.TableMetaData;
import org.apache.shardingsphere.infra.metadata.schema.util.IndexMetaDataUtil;
Expand All @@ -37,7 +34,6 @@
import java.sql.SQLException;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.Map;
import java.util.Optional;

Expand All @@ -51,42 +47,30 @@ public final class DropIndexStatementSchemaRefresher implements MetaDataRefreshe
@Override
public void refresh(final ShardingSphereMetaData metaData, final FederationDatabaseMetaData database, final Map<String, OptimizerPlannerContext> optimizerPlanners,
final Collection<String> logicDataSourceNames, final String schemaName, final DropIndexStatement sqlStatement, final ConfigurationProperties props) throws SQLException {
Multimap<String, IndexSegment> schemaIndexSegmentMap = getSchemaIndexSegmentMap(sqlStatement, schemaName);
for (String each : schemaIndexSegmentMap.keySet()) {
ShardingSphereSchema schema = metaData.getSchemaByName(each);
Collection<TableMetaData> alteredTables = new LinkedList<>();
for (String tableName : getLogicTableNames(schema, sqlStatement)) {
if (!schema.containsTable(tableName)) {
continue;
}
TableMetaData tableMetaData = schema.get(tableName);
schemaIndexSegmentMap.get(each).forEach(indexSegment -> tableMetaData.getIndexes().remove(indexSegment.getIndexName().getIdentifier().getValue()));
alteredTables.add(tableMetaData);
for (IndexSegment each : sqlStatement.getIndexes()) {
String actualSchemaName = each.getOwner().map(optional -> optional.getIdentifier().getValue()).orElse(schemaName);
Optional<String> logicTableName = findLogicTableName(metaData, sqlStatement, Collections.singletonList(each));
if (!logicTableName.isPresent()) {
continue;
}
post(metaData.getDatabaseName(), each, alteredTables);
TableMetaData tableMetaData = metaData.getSchemaByName(actualSchemaName).get(logicTableName.get());
tableMetaData.getIndexes().remove(each.getIndexName().getIdentifier().getValue());
post(metaData.getDatabaseName(), actualSchemaName, tableMetaData);
}
}

private Collection<String> getLogicTableNames(final ShardingSphereSchema schema, final DropIndexStatement sqlStatement) {
private Optional<String> findLogicTableName(final ShardingSphereMetaData metaData, final DropIndexStatement sqlStatement, final Collection<IndexSegment> indexSegments) {
Optional<SimpleTableSegment> simpleTableSegment = DropIndexStatementHandler.getSimpleTableSegment(sqlStatement);
if (simpleTableSegment.isPresent()) {
return Collections.singletonList(simpleTableSegment.get().getTableName().getIdentifier().getValue());
return Optional.of(simpleTableSegment.get().getTableName().getIdentifier().getValue());
}
return IndexMetaDataUtil.getTableNamesFromMetaData(schema, sqlStatement.getIndexes());
Collection<String> tableNames = IndexMetaDataUtil.getTableNamesFromMetaData(metaData, indexSegments, metaData.getResource().getDatabaseType());
return tableNames.isEmpty() ? Optional.empty() : Optional.of(tableNames.iterator().next());
}

private Multimap<String, IndexSegment> getSchemaIndexSegmentMap(final DropIndexStatement dropIndexStatement, final String defaultSchemaName) {
Multimap<String, IndexSegment> result = LinkedHashMultimap.create();
for (IndexSegment each : dropIndexStatement.getIndexes()) {
String schemaName = each.getOwner().map(optional -> optional.getIdentifier().getValue()).orElseGet(() -> defaultSchemaName);
result.put(schemaName, each);
}
return result;
}

private void post(final String databaseName, final String schemaName, final Collection<TableMetaData> alteredTables) {
private void post(final String databaseName, final String schemaName, final TableMetaData tableMetaData) {
SchemaAlteredEvent event = new SchemaAlteredEvent(databaseName, schemaName);
event.getAlteredTables().addAll(alteredTables);
event.getAlteredTables().add(tableMetaData);
ShardingSphereEventBus.getInstance().post(event);
}

Expand Down
Loading

0 comments on commit b01274c

Please sign in to comment.