Skip to content

Commit

Permalink
checker: allow to ignore some checking items (pingcap#65)
Browse files Browse the repository at this point in the history
  • Loading branch information
IANTHEREAL authored Mar 7, 2019
1 parent f80e6fb commit 975b27e
Show file tree
Hide file tree
Showing 9 changed files with 270 additions and 18 deletions.
41 changes: 41 additions & 0 deletions checker/check_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package checker

import (
"context"
"testing"

tc "github.com/pingcap/check"
"github.com/pingcap/dm/dm/config"
)

func TestChecker(t *testing.T) {
tc.TestingT(t)
}

type testChecker struct{}

var _ = tc.Suite(&testChecker{})

func (t *testChecker) TestCheckSyncConfig(c *tc.C) {
c.Assert(CheckSyncConfig(context.Background(), nil), tc.IsNil)

cfgs := []*config.SubTaskConfig{
{
IgnoreCheckingItems: []string{config.AllChecking},
},
}
c.Assert(CheckSyncConfig(context.Background(), cfgs), tc.IsNil)
}
72 changes: 58 additions & 14 deletions checker/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package checker

import (
"bytes"
"database/sql"
"encoding/json"
"fmt"
Expand Down Expand Up @@ -53,17 +54,19 @@ type Checker struct {

instances []*mysqlInstance

checkList []check.Checker
result struct {
checkList []check.Checker
checkingItems map[string]string
result struct {
sync.RWMutex
detail *check.Results
}
}

// NewChecker returns a checker
func NewChecker(cfgs []*config.SubTaskConfig) *Checker {
func NewChecker(cfgs []*config.SubTaskConfig, checkingItems map[string]string) *Checker {
c := &Checker{
instances: make([]*mysqlInstance, 0, len(cfgs)),
instances: make([]*mysqlInstance, 0, len(cfgs)),
checkingItems: checkingItems,
}

for _, cfg := range cfgs {
Expand All @@ -82,6 +85,9 @@ func (c *Checker) Init() error {
shardingCounter := make(map[string]int)
dbs := make(map[string]*sql.DB)
columnMapping := make(map[string]*column.Mapping)
_, checkingShardID := c.checkingItems[config.ShardAutoIncrementIDChecking]
_, checkingShard := c.checkingItems[config.ShardTableSchemaChecking]
_, checkSchema := c.checkingItems[config.TableSchemaChecking]

for _, instance := range c.instances {
bw := filter.New(instance.cfg.CaseSensitive, instance.cfg.BWList)
Expand Down Expand Up @@ -125,6 +131,29 @@ func (c *Checker) Init() error {
return errors.Trace(err)
}

if _, ok := c.checkingItems[config.VersionChecking]; ok {
c.checkList = append(c.checkList, check.NewMySQLVersionChecker(instance.sourceDB, instance.sourceDBinfo))
}
if _, ok := c.checkingItems[config.BinlogEnableChecking]; ok {
c.checkList = append(c.checkList, check.NewMySQLBinlogEnableChecker(instance.sourceDB, instance.sourceDBinfo))
}
if _, ok := c.checkingItems[config.BinlogFormatChecking]; ok {
c.checkList = append(c.checkList, check.NewMySQLBinlogFormatChecker(instance.sourceDB, instance.sourceDBinfo))
}
if _, ok := c.checkingItems[config.BinlogRowImageChecking]; ok {
c.checkList = append(c.checkList, check.NewMySQLBinlogRowImageChecker(instance.sourceDB, instance.sourceDBinfo))
}
if _, ok := c.checkingItems[config.DumpPrivilegeChecking]; ok {
c.checkList = append(c.checkList, check.NewSourceDumpPrivilegeChecker(instance.sourceDB, instance.sourceDBinfo))
}
if _, ok := c.checkingItems[config.ReplicationPrivilegeChecking]; ok {
c.checkList = append(c.checkList, check.NewSourceReplicationPrivilegeChecker(instance.sourceDB, instance.sourceDBinfo))
}

if !checkingShard && !checkSchema {
continue
}

mapping, err := utils.FetchTargetDoTables(instance.sourceDB, bw, r)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -155,24 +184,39 @@ func (c *Checker) Init() error {
}
dbs[instance.cfg.SourceID] = instance.sourceDB

c.checkList = append(c.checkList, check.NewMySQLBinlogEnableChecker(instance.sourceDB, instance.sourceDBinfo))
c.checkList = append(c.checkList, check.NewMySQLBinlogFormatChecker(instance.sourceDB, instance.sourceDBinfo))
c.checkList = append(c.checkList, check.NewMySQLBinlogRowImageChecker(instance.sourceDB, instance.sourceDBinfo))
c.checkList = append(c.checkList, check.NewSourcePrivilegeChecker(instance.sourceDB, instance.sourceDBinfo))
c.checkList = append(c.checkList, check.NewTablesChecker(instance.sourceDB, instance.sourceDBinfo, checkTables))
if checkSchema {
c.checkList = append(c.checkList, check.NewTablesChecker(instance.sourceDB, instance.sourceDBinfo, checkTables))
}
}

for name, shardingSet := range sharding {
if shardingCounter[name] <= 1 {
continue
}
if checkingShard {
for name, shardingSet := range sharding {
if shardingCounter[name] <= 1 {
continue
}

c.checkList = append(c.checkList, check.NewShardingTablesCheck(name, dbs, shardingSet, columnMapping))
c.checkList = append(c.checkList, check.NewShardingTablesCheck(name, dbs, shardingSet, columnMapping, checkingShardID))
}
}

log.Infof(c.displayCheckingItems())
return nil
}

func (c *Checker) displayCheckingItems() string {
if len(c.checkList) == 0 {
return "not found any checking items\n"
}

var buf bytes.Buffer
fmt.Fprintf(&buf, "\n************ task %s checking items ************\n", c.instances[0].cfg.Name)
for _, checkFunc := range c.checkList {
fmt.Fprintf(&buf, "%s\n", checkFunc.Name())
}
fmt.Fprintf(&buf, "************ task %s checking items ************", c.instances[0].cfg.Name)
return buf.String()
}

// Process implements Unit interface
func (c *Checker) Process(ctx context.Context, pr chan pb.ProcessResult) {
cctx, cancel := context.WithTimeout(ctx, time.Minute)
Expand Down
12 changes: 11 additions & 1 deletion checker/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,17 @@ var (

// CheckSyncConfig checks synchronization configuration
func CheckSyncConfig(ctx context.Context, cfgs []*config.SubTaskConfig) error {
c := NewChecker(cfgs)
if len(cfgs) == 0 {
return nil
}

// all `IgnoreCheckingItems` of sub-task are same, so we take first one
checkingItems := config.FilterCheckingItems(cfgs[0].IgnoreCheckingItems)
if len(checkingItems) == 0 {
return nil
}

c := NewChecker(cfgs, checkingItems)

err := c.Init()
if err != nil {
Expand Down
89 changes: 89 additions & 0 deletions dm/config/checking_item.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package config

import (
"bytes"
"fmt"

"github.com/pingcap/errors"
)

// DM definition checking items
// refer github.com/pingcap/tidb-tools/pkg/check
const (
AllChecking = "all"
DumpPrivilegeChecking = "dump_privilege"
ReplicationPrivilegeChecking = "replication_privilege"
VersionChecking = "version"
BinlogEnableChecking = "binlog_enable"
BinlogFormatChecking = "binlog_format"
BinlogRowImageChecking = "binlog_row_image"
TableSchemaChecking = "table_schema"
ShardTableSchemaChecking = "schema_of_shard_tables"
ShardAutoIncrementIDChecking = "auto_increment_ID"
)

// AllCheckingItems contains all checking items
var AllCheckingItems = map[string]string{
AllChecking: "all checking items",
DumpPrivilegeChecking: "dump privileges of source DB checking item",
ReplicationPrivilegeChecking: "replication privileges of source DB checking item",
VersionChecking: "MySQL/MariaDB version checking item",
BinlogEnableChecking: "binlog enable checking item",
BinlogFormatChecking: "binlog format checking item",
BinlogRowImageChecking: "binlog row image checking item",
TableSchemaChecking: "table schema compatibility checking item",
ShardTableSchemaChecking: "consistent schema of shard tables checking item",
ShardAutoIncrementIDChecking: "conflict auto increment ID of shard tables checking item",
}

// ValidateCheckingItem validates checking item
func ValidateCheckingItem(item string) error {
if _, ok := AllCheckingItems[item]; ok {
return nil
}

return errors.Errorf("checking item %s is not supported\n%s", item, SupportCheckingItems())
}

// SupportCheckingItems returns all supporting checking item
func SupportCheckingItems() string {
var buf bytes.Buffer
fmt.Fprintf(&buf, "************ supporting checking items ************\n name:\t\tdescription\n")
for name, desc := range AllCheckingItems {
fmt.Fprintf(&buf, "%s:\t%s\n", name, desc)
}
fmt.Fprintf(&buf, "************ supporting checking items ************\n")
return buf.String()
}

// FilterCheckingItems filters ignored items from all checking items
func FilterCheckingItems(ignoredItems []string) map[string]string {
checkingItems := make(map[string]string)
for item, desc := range AllCheckingItems {
checkingItems[item] = desc
}
delete(checkingItems, AllChecking)

for _, item := range ignoredItems {
if item == AllChecking {
return nil
}

delete(checkingItems, item)
}

return checkingItems
}
57 changes: 57 additions & 0 deletions dm/config/checking_item_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package config

import (
"testing"

. "github.com/pingcap/check"
)

func TestConfig(t *testing.T) {
TestingT(t)
}

type testConfig struct{}

var _ = Suite(&testConfig{})

func (t *testConfig) TestCheckingItems(c *C) {
for item := range AllCheckingItems {
c.Assert(ValidateCheckingItem(item), IsNil)
}
c.Assert(ValidateCheckingItem("xxx"), NotNil)

// ignore all checking items
ignoredCheckingItems := []string{AllChecking}
c.Assert(FilterCheckingItems(ignoredCheckingItems), IsNil)
ignoredCheckingItems = append(ignoredCheckingItems, ShardTableSchemaChecking)
c.Assert(FilterCheckingItems(ignoredCheckingItems), IsNil)

// ignore shard checking items
checkingItems := make(map[string]string)
for item, desc := range AllCheckingItems {
checkingItems[item] = desc
}
delete(checkingItems, AllChecking)

c.Assert(FilterCheckingItems(ignoredCheckingItems[:0]), DeepEquals, checkingItems)

delete(checkingItems, ShardTableSchemaChecking)
c.Assert(FilterCheckingItems(ignoredCheckingItems[1:]), DeepEquals, checkingItems)

ignoredCheckingItems = append(ignoredCheckingItems, ShardAutoIncrementIDChecking)
delete(checkingItems, ShardAutoIncrementIDChecking)
c.Assert(FilterCheckingItems(ignoredCheckingItems[1:]), DeepEquals, checkingItems)
}
2 changes: 2 additions & 0 deletions dm/config/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ type SubTaskConfig struct {

Name string `toml:"name" json:"name"`
Mode string `toml:"mode" json:"mode"`
// treat it as hidden configuration
IgnoreCheckingItems []string `toml:"ignore-checking-items" json:"ignore-checking-items"`
// it represents a MySQL/MariaDB instance or a replica group
SourceID string `toml:"source-id" json:"source-id"`
ServerID int `toml:"server-id" json:"server-id"`
Expand Down
9 changes: 9 additions & 0 deletions dm/config/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ type TaskConfig struct {
Name string `yaml:"name"`
TaskMode string `yaml:"task-mode"`
IsSharding bool `yaml:"is-sharding"`
// treat it as hidden configuration
IgnoreCheckingItems []string `yaml:"ignore-checking-items"`
// we store detail status in meta
// don't save configuration into it
MetaSchema string `yaml:"meta-schema"`
Expand Down Expand Up @@ -311,6 +313,12 @@ func (c *TaskConfig) adjust() error {
return errors.New("please specify right task-mode, support `full`, `incremental`, `all`")
}

for _, item := range c.IgnoreCheckingItems {
if err := ValidateCheckingItem(item); err != nil {
return errors.Trace(err)
}
}

if c.OnlineDDLScheme != "" && c.OnlineDDLScheme != PT && c.OnlineDDLScheme != GHOST {
return errors.NotSupportedf("online scheme %s", c.OnlineDDLScheme)
}
Expand Down Expand Up @@ -431,6 +439,7 @@ func (c *TaskConfig) SubTaskConfigs(sources map[string]DBConfig) ([]*SubTaskConf
cfg := NewSubTaskConfig()
cfg.IsSharding = c.IsSharding
cfg.OnlineDDLScheme = c.OnlineDDLScheme
cfg.IgnoreCheckingItems = c.IgnoreCheckingItems
cfg.Name = c.Name
cfg.Mode = c.TaskMode
cfg.CaseSensitive = c.CaseSensitive
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ require (
github.com/pingcap/kvproto v0.0.0-20181206061346-54cf0a0dfe55 // indirect
github.com/pingcap/parser v0.0.0-20181205023950-c563561800a2
github.com/pingcap/tidb v0.0.0-20181206093020-98c72c6a8ee3
github.com/pingcap/tidb-tools v2.1.3-0.20190115072802-b674be072353+incompatible
github.com/pingcap/tidb-tools v2.1.3-0.20190305052038-e6c996e1e2ee+incompatible
github.com/pingcap/tipb v0.0.0-20181126132056-a7fd2aaa9719 // indirect
github.com/prometheus/client_golang v0.9.1
github.com/prometheus/common v0.0.0-20181126121408-4724e9255275
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@ github.com/pingcap/pd v2.1.0-rc.4+incompatible/go.mod h1:nD3+EoYes4+aNNODO99ES59
github.com/pingcap/tidb v0.0.0-20181206093020-98c72c6a8ee3 h1:VW18UjQwAwsvod4oq/2bigmniV8a2dss44LRADF/UFg=
github.com/pingcap/tidb v0.0.0-20181206093020-98c72c6a8ee3/go.mod h1:ePvdDXDo3CWkhAlwgnVgj7w3PkpXYum72/fHVng0wQI=
github.com/pingcap/tidb-tools v0.0.0-20181112132202-4860a0d5de03/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM=
github.com/pingcap/tidb-tools v2.1.3-0.20190115072802-b674be072353+incompatible h1:pMaTt63rmQ1wDZo2E3Iyci+KqP2C5yKvfrtRwWSx5Rs=
github.com/pingcap/tidb-tools v2.1.3-0.20190115072802-b674be072353+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM=
github.com/pingcap/tidb-tools v2.1.3-0.20190305052038-e6c996e1e2ee+incompatible h1:Xiq43tSxpLZcypX3GnAleRoRH+0htKlT25BSN+51Ccg=
github.com/pingcap/tidb-tools v2.1.3-0.20190305052038-e6c996e1e2ee+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM=
github.com/pingcap/tipb v0.0.0-20170310053819-1043caee48da/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI=
github.com/pingcap/tipb v0.0.0-20181012112600-11e33c750323/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI=
github.com/pingcap/tipb v0.0.0-20181126132056-a7fd2aaa9719 h1:MEHhwNcWuuoJJvYxMaWIXFp+BvK/WORPiiwmSOS0cA4=
Expand Down

0 comments on commit 975b27e

Please sign in to comment.