diff --git a/checker/check_test.go b/checker/check_test.go new file mode 100644 index 0000000000..face1a4569 --- /dev/null +++ b/checker/check_test.go @@ -0,0 +1,41 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package checker + +import ( + "context" + "testing" + + tc "github.com/pingcap/check" + "github.com/pingcap/dm/dm/config" +) + +func TestChecker(t *testing.T) { + tc.TestingT(t) +} + +type testChecker struct{} + +var _ = tc.Suite(&testChecker{}) + +func (t *testChecker) TestCheckSyncConfig(c *tc.C) { + c.Assert(CheckSyncConfig(context.Background(), nil), tc.IsNil) + + cfgs := []*config.SubTaskConfig{ + { + IgnoreCheckingItems: []string{config.AllChecking}, + }, + } + c.Assert(CheckSyncConfig(context.Background(), cfgs), tc.IsNil) +} diff --git a/checker/checker.go b/checker/checker.go index 42d4a05884..02062655ee 100644 --- a/checker/checker.go +++ b/checker/checker.go @@ -14,6 +14,7 @@ package checker import ( + "bytes" "database/sql" "encoding/json" "fmt" @@ -53,17 +54,19 @@ type Checker struct { instances []*mysqlInstance - checkList []check.Checker - result struct { + checkList []check.Checker + checkingItems map[string]string + result struct { sync.RWMutex detail *check.Results } } // NewChecker returns a checker -func NewChecker(cfgs []*config.SubTaskConfig) *Checker { +func NewChecker(cfgs []*config.SubTaskConfig, checkingItems map[string]string) *Checker { c := &Checker{ - instances: make([]*mysqlInstance, 0, len(cfgs)), + instances: make([]*mysqlInstance, 0, len(cfgs)), + checkingItems: checkingItems, } for _, cfg := range cfgs { @@ -82,6 +85,9 @@ func (c *Checker) Init() error { shardingCounter := make(map[string]int) dbs := make(map[string]*sql.DB) columnMapping := make(map[string]*column.Mapping) + _, checkingShardID := c.checkingItems[config.ShardAutoIncrementIDChecking] + _, checkingShard := c.checkingItems[config.ShardTableSchemaChecking] + _, checkSchema := c.checkingItems[config.TableSchemaChecking] for _, instance := range c.instances { bw := filter.New(instance.cfg.CaseSensitive, instance.cfg.BWList) @@ -125,6 +131,29 @@ func (c *Checker) Init() error { return errors.Trace(err) } + if _, ok := c.checkingItems[config.VersionChecking]; ok { + c.checkList = append(c.checkList, check.NewMySQLVersionChecker(instance.sourceDB, instance.sourceDBinfo)) + } + if _, ok := c.checkingItems[config.BinlogEnableChecking]; ok { + c.checkList = append(c.checkList, check.NewMySQLBinlogEnableChecker(instance.sourceDB, instance.sourceDBinfo)) + } + if _, ok := c.checkingItems[config.BinlogFormatChecking]; ok { + c.checkList = append(c.checkList, check.NewMySQLBinlogFormatChecker(instance.sourceDB, instance.sourceDBinfo)) + } + if _, ok := c.checkingItems[config.BinlogRowImageChecking]; ok { + c.checkList = append(c.checkList, check.NewMySQLBinlogRowImageChecker(instance.sourceDB, instance.sourceDBinfo)) + } + if _, ok := c.checkingItems[config.DumpPrivilegeChecking]; ok { + c.checkList = append(c.checkList, check.NewSourceDumpPrivilegeChecker(instance.sourceDB, instance.sourceDBinfo)) + } + if _, ok := c.checkingItems[config.ReplicationPrivilegeChecking]; ok { + c.checkList = append(c.checkList, check.NewSourceReplicationPrivilegeChecker(instance.sourceDB, instance.sourceDBinfo)) + } + + if !checkingShard && !checkSchema { + continue + } + mapping, err := utils.FetchTargetDoTables(instance.sourceDB, bw, r) if err != nil { return errors.Trace(err) @@ -155,24 +184,39 @@ func (c *Checker) Init() error { } dbs[instance.cfg.SourceID] = instance.sourceDB - c.checkList = append(c.checkList, check.NewMySQLBinlogEnableChecker(instance.sourceDB, instance.sourceDBinfo)) - c.checkList = append(c.checkList, check.NewMySQLBinlogFormatChecker(instance.sourceDB, instance.sourceDBinfo)) - c.checkList = append(c.checkList, check.NewMySQLBinlogRowImageChecker(instance.sourceDB, instance.sourceDBinfo)) - c.checkList = append(c.checkList, check.NewSourcePrivilegeChecker(instance.sourceDB, instance.sourceDBinfo)) - c.checkList = append(c.checkList, check.NewTablesChecker(instance.sourceDB, instance.sourceDBinfo, checkTables)) + if checkSchema { + c.checkList = append(c.checkList, check.NewTablesChecker(instance.sourceDB, instance.sourceDBinfo, checkTables)) + } } - for name, shardingSet := range sharding { - if shardingCounter[name] <= 1 { - continue - } + if checkingShard { + for name, shardingSet := range sharding { + if shardingCounter[name] <= 1 { + continue + } - c.checkList = append(c.checkList, check.NewShardingTablesCheck(name, dbs, shardingSet, columnMapping)) + c.checkList = append(c.checkList, check.NewShardingTablesCheck(name, dbs, shardingSet, columnMapping, checkingShardID)) + } } + log.Infof(c.displayCheckingItems()) return nil } +func (c *Checker) displayCheckingItems() string { + if len(c.checkList) == 0 { + return "not found any checking items\n" + } + + var buf bytes.Buffer + fmt.Fprintf(&buf, "\n************ task %s checking items ************\n", c.instances[0].cfg.Name) + for _, checkFunc := range c.checkList { + fmt.Fprintf(&buf, "%s\n", checkFunc.Name()) + } + fmt.Fprintf(&buf, "************ task %s checking items ************", c.instances[0].cfg.Name) + return buf.String() +} + // Process implements Unit interface func (c *Checker) Process(ctx context.Context, pr chan pb.ProcessResult) { cctx, cancel := context.WithTimeout(ctx, time.Minute) diff --git a/checker/cmd.go b/checker/cmd.go index 7f17774a1f..0454350086 100644 --- a/checker/cmd.go +++ b/checker/cmd.go @@ -27,7 +27,17 @@ var ( // CheckSyncConfig checks synchronization configuration func CheckSyncConfig(ctx context.Context, cfgs []*config.SubTaskConfig) error { - c := NewChecker(cfgs) + if len(cfgs) == 0 { + return nil + } + + // all `IgnoreCheckingItems` of sub-task are same, so we take first one + checkingItems := config.FilterCheckingItems(cfgs[0].IgnoreCheckingItems) + if len(checkingItems) == 0 { + return nil + } + + c := NewChecker(cfgs, checkingItems) err := c.Init() if err != nil { diff --git a/dm/config/checking_item.go b/dm/config/checking_item.go new file mode 100644 index 0000000000..ae8305ab04 --- /dev/null +++ b/dm/config/checking_item.go @@ -0,0 +1,89 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + "bytes" + "fmt" + + "github.com/pingcap/errors" +) + +// DM definition checking items +// refer github.com/pingcap/tidb-tools/pkg/check +const ( + AllChecking = "all" + DumpPrivilegeChecking = "dump_privilege" + ReplicationPrivilegeChecking = "replication_privilege" + VersionChecking = "version" + BinlogEnableChecking = "binlog_enable" + BinlogFormatChecking = "binlog_format" + BinlogRowImageChecking = "binlog_row_image" + TableSchemaChecking = "table_schema" + ShardTableSchemaChecking = "schema_of_shard_tables" + ShardAutoIncrementIDChecking = "auto_increment_ID" +) + +// AllCheckingItems contains all checking items +var AllCheckingItems = map[string]string{ + AllChecking: "all checking items", + DumpPrivilegeChecking: "dump privileges of source DB checking item", + ReplicationPrivilegeChecking: "replication privileges of source DB checking item", + VersionChecking: "MySQL/MariaDB version checking item", + BinlogEnableChecking: "binlog enable checking item", + BinlogFormatChecking: "binlog format checking item", + BinlogRowImageChecking: "binlog row image checking item", + TableSchemaChecking: "table schema compatibility checking item", + ShardTableSchemaChecking: "consistent schema of shard tables checking item", + ShardAutoIncrementIDChecking: "conflict auto increment ID of shard tables checking item", +} + +// ValidateCheckingItem validates checking item +func ValidateCheckingItem(item string) error { + if _, ok := AllCheckingItems[item]; ok { + return nil + } + + return errors.Errorf("checking item %s is not supported\n%s", item, SupportCheckingItems()) +} + +// SupportCheckingItems returns all supporting checking item +func SupportCheckingItems() string { + var buf bytes.Buffer + fmt.Fprintf(&buf, "************ supporting checking items ************\n name:\t\tdescription\n") + for name, desc := range AllCheckingItems { + fmt.Fprintf(&buf, "%s:\t%s\n", name, desc) + } + fmt.Fprintf(&buf, "************ supporting checking items ************\n") + return buf.String() +} + +// FilterCheckingItems filters ignored items from all checking items +func FilterCheckingItems(ignoredItems []string) map[string]string { + checkingItems := make(map[string]string) + for item, desc := range AllCheckingItems { + checkingItems[item] = desc + } + delete(checkingItems, AllChecking) + + for _, item := range ignoredItems { + if item == AllChecking { + return nil + } + + delete(checkingItems, item) + } + + return checkingItems +} diff --git a/dm/config/checking_item_test.go b/dm/config/checking_item_test.go new file mode 100644 index 0000000000..7570363972 --- /dev/null +++ b/dm/config/checking_item_test.go @@ -0,0 +1,57 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + "testing" + + . "github.com/pingcap/check" +) + +func TestConfig(t *testing.T) { + TestingT(t) +} + +type testConfig struct{} + +var _ = Suite(&testConfig{}) + +func (t *testConfig) TestCheckingItems(c *C) { + for item := range AllCheckingItems { + c.Assert(ValidateCheckingItem(item), IsNil) + } + c.Assert(ValidateCheckingItem("xxx"), NotNil) + + // ignore all checking items + ignoredCheckingItems := []string{AllChecking} + c.Assert(FilterCheckingItems(ignoredCheckingItems), IsNil) + ignoredCheckingItems = append(ignoredCheckingItems, ShardTableSchemaChecking) + c.Assert(FilterCheckingItems(ignoredCheckingItems), IsNil) + + // ignore shard checking items + checkingItems := make(map[string]string) + for item, desc := range AllCheckingItems { + checkingItems[item] = desc + } + delete(checkingItems, AllChecking) + + c.Assert(FilterCheckingItems(ignoredCheckingItems[:0]), DeepEquals, checkingItems) + + delete(checkingItems, ShardTableSchemaChecking) + c.Assert(FilterCheckingItems(ignoredCheckingItems[1:]), DeepEquals, checkingItems) + + ignoredCheckingItems = append(ignoredCheckingItems, ShardAutoIncrementIDChecking) + delete(checkingItems, ShardAutoIncrementIDChecking) + c.Assert(FilterCheckingItems(ignoredCheckingItems[1:]), DeepEquals, checkingItems) +} diff --git a/dm/config/subtask.go b/dm/config/subtask.go index 33e4fdd1b9..ceee578361 100644 --- a/dm/config/subtask.go +++ b/dm/config/subtask.go @@ -90,6 +90,8 @@ type SubTaskConfig struct { Name string `toml:"name" json:"name"` Mode string `toml:"mode" json:"mode"` + // treat it as hidden configuration + IgnoreCheckingItems []string `toml:"ignore-checking-items" json:"ignore-checking-items"` // it represents a MySQL/MariaDB instance or a replica group SourceID string `toml:"source-id" json:"source-id"` ServerID int `toml:"server-id" json:"server-id"` diff --git a/dm/config/task.go b/dm/config/task.go index 9ae7ebb4a8..a90a00f975 100644 --- a/dm/config/task.go +++ b/dm/config/task.go @@ -217,6 +217,8 @@ type TaskConfig struct { Name string `yaml:"name"` TaskMode string `yaml:"task-mode"` IsSharding bool `yaml:"is-sharding"` + // treat it as hidden configuration + IgnoreCheckingItems []string `yaml:"ignore-checking-items"` // we store detail status in meta // don't save configuration into it MetaSchema string `yaml:"meta-schema"` @@ -311,6 +313,12 @@ func (c *TaskConfig) adjust() error { return errors.New("please specify right task-mode, support `full`, `incremental`, `all`") } + for _, item := range c.IgnoreCheckingItems { + if err := ValidateCheckingItem(item); err != nil { + return errors.Trace(err) + } + } + if c.OnlineDDLScheme != "" && c.OnlineDDLScheme != PT && c.OnlineDDLScheme != GHOST { return errors.NotSupportedf("online scheme %s", c.OnlineDDLScheme) } @@ -431,6 +439,7 @@ func (c *TaskConfig) SubTaskConfigs(sources map[string]DBConfig) ([]*SubTaskConf cfg := NewSubTaskConfig() cfg.IsSharding = c.IsSharding cfg.OnlineDDLScheme = c.OnlineDDLScheme + cfg.IgnoreCheckingItems = c.IgnoreCheckingItems cfg.Name = c.Name cfg.Mode = c.TaskMode cfg.CaseSensitive = c.CaseSensitive diff --git a/go.mod b/go.mod index 51d3d3e4bb..f3e84015a2 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/pingcap/kvproto v0.0.0-20181206061346-54cf0a0dfe55 // indirect github.com/pingcap/parser v0.0.0-20181205023950-c563561800a2 github.com/pingcap/tidb v0.0.0-20181206093020-98c72c6a8ee3 - github.com/pingcap/tidb-tools v2.1.3-0.20190115072802-b674be072353+incompatible + github.com/pingcap/tidb-tools v2.1.3-0.20190305052038-e6c996e1e2ee+incompatible github.com/pingcap/tipb v0.0.0-20181126132056-a7fd2aaa9719 // indirect github.com/prometheus/client_golang v0.9.1 github.com/prometheus/common v0.0.0-20181126121408-4724e9255275 diff --git a/go.sum b/go.sum index 1aed3b88b8..ff1fe3d71b 100644 --- a/go.sum +++ b/go.sum @@ -132,8 +132,8 @@ github.com/pingcap/pd v2.1.0-rc.4+incompatible/go.mod h1:nD3+EoYes4+aNNODO99ES59 github.com/pingcap/tidb v0.0.0-20181206093020-98c72c6a8ee3 h1:VW18UjQwAwsvod4oq/2bigmniV8a2dss44LRADF/UFg= github.com/pingcap/tidb v0.0.0-20181206093020-98c72c6a8ee3/go.mod h1:ePvdDXDo3CWkhAlwgnVgj7w3PkpXYum72/fHVng0wQI= github.com/pingcap/tidb-tools v0.0.0-20181112132202-4860a0d5de03/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM= -github.com/pingcap/tidb-tools v2.1.3-0.20190115072802-b674be072353+incompatible h1:pMaTt63rmQ1wDZo2E3Iyci+KqP2C5yKvfrtRwWSx5Rs= -github.com/pingcap/tidb-tools v2.1.3-0.20190115072802-b674be072353+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM= +github.com/pingcap/tidb-tools v2.1.3-0.20190305052038-e6c996e1e2ee+incompatible h1:Xiq43tSxpLZcypX3GnAleRoRH+0htKlT25BSN+51Ccg= +github.com/pingcap/tidb-tools v2.1.3-0.20190305052038-e6c996e1e2ee+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM= github.com/pingcap/tipb v0.0.0-20170310053819-1043caee48da/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI= github.com/pingcap/tipb v0.0.0-20181012112600-11e33c750323/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI= github.com/pingcap/tipb v0.0.0-20181126132056-a7fd2aaa9719 h1:MEHhwNcWuuoJJvYxMaWIXFp+BvK/WORPiiwmSOS0cA4=