Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

*: refactor subtask management and communication model #116

Merged
merged 61 commits into from
May 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
d340a5c
*: add queryWorkConfig rpc interface
IANTHEREAL Mar 28, 2019
b152a71
*: refine protobuf definition and refactor dm/worker/meta.go
IANTHEREAL Apr 4, 2019
1f76dd6
*: refine worker
IANTHEREAL Apr 8, 2019
46b6c70
*: Asynchronous dm-worker controller
IANTHEREAL Apr 8, 2019
017683c
*: fix dm-worker meta
IANTHEREAL Apr 9, 2019
2a0d89a
*: add log file
IANTHEREAL Apr 10, 2019
982b39c
*: fix log
IANTHEREAL Apr 10, 2019
1bdea46
revert subtask.go
IANTHEREAL Apr 11, 2019
7ccb5df
*: revert "revert subtask.go"
IANTHEREAL Apr 11, 2019
0a7fcc5
*: add two method for WorkerServer interface
IANTHEREAL Apr 11, 2019
620469b
*: add logID in task operation method
IANTHEREAL Apr 11, 2019
0beef4c
*: remove useless grpc interface
IANTHEREAL Apr 11, 2019
2c81f5c
*: change dm-master commucation way
IANTHEREAL Apr 11, 2019
3e8d554
*: change dm-worker protobuf definition
IANTHEREAL Apr 12, 2019
9bd1be6
*: fix dm-worker code
IANTHEREAL Apr 12, 2019
0ed8655
*: correct code of dm-worker
IANTHEREAL Apr 12, 2019
ede3c79
*: complete code
IANTHEREAL Apr 13, 2019
915d896
fix meta_test.go
IANTHEREAL Apr 17, 2019
5ed4124
*: add test for log
IANTHEREAL Apr 17, 2019
8399ea3
*: add test for log
IANTHEREAL Apr 18, 2019
333f395
Merge branch 'master' of /~https://github.com/pingcap/dm into ian/refin…
IANTHEREAL Apr 18, 2019
77600b7
*: add unit test for subtask
IANTHEREAL Apr 19, 2019
a56cbc9
*: correct ut of subtask
IANTHEREAL Apr 19, 2019
37aee2e
*: add more test for subtask
IANTHEREAL Apr 20, 2019
086e042
*: fix worker close
IANTHEREAL Apr 20, 2019
2d5db15
*: fix ci
IANTHEREAL Apr 20, 2019
f23fde5
*: fix ci
IANTHEREAL Apr 20, 2019
aa177e6
fix ci
IANTHEREAL Apr 21, 2019
c88b0f1
*: fix ci
IANTHEREAL Apr 21, 2019
40fccfd
correct code
IANTHEREAL Apr 21, 2019
1f22f5f
*: test
IANTHEREAL Apr 21, 2019
db91407
Revert "*: test"
IANTHEREAL Apr 21, 2019
82fa7e4
Merge branch 'master' of /~https://github.com/pingcap/dm into ian/refin…
IANTHEREAL Apr 22, 2019
5f8cc0b
*: fix dmctl.go in tests
IANTHEREAL Apr 22, 2019
0e65517
*: add error check
IANTHEREAL Apr 22, 2019
662e871
*: address comments
IANTHEREAL Apr 30, 2019
ed9237b
Merge branch 'master' into ian/refine-task
IANTHEREAL Apr 30, 2019
b65309a
*: refine format
IANTHEREAL Apr 30, 2019
1a55881
address comments
IANTHEREAL May 8, 2019
50860ea
*: address comments
IANTHEREAL May 8, 2019
55f960e
worker: revert modifications of subtask
IANTHEREAL May 8, 2019
88a91a2
address comment
IANTHEREAL May 8, 2019
a2398d4
add subtask test
IANTHEREAL May 9, 2019
cfd244b
Merge branch 'master' into ian/refine-task
IANTHEREAL May 9, 2019
a31abe5
*: save code
IANTHEREAL May 9, 2019
aa64ede
Merge branch 'ian/refine-task' of /~https://github.com/GregoryIan/dm in…
IANTHEREAL May 9, 2019
8614413
save code
IANTHEREAL May 9, 2019
031b8cd
*: fix status display after task paused, but still not display after …
IANTHEREAL May 9, 2019
517fefb
Merge branch 'master' into ian/refine-task
IANTHEREAL May 10, 2019
2419f25
*: refine log and function
IANTHEREAL May 14, 2019
0112008
Merge branch 'master' into ian/refine-task
IANTHEREAL May 14, 2019
8fccf67
Merge branch 'master' into ian/refine-task
amyangfei May 14, 2019
275d971
*: address comments
IANTHEREAL May 14, 2019
0472642
Merge branch 'ian/refine-task' of /~https://github.com/GregoryIan/dm in…
IANTHEREAL May 14, 2019
4392fa0
revert some code
IANTHEREAL May 14, 2019
cb78653
Merge branch 'master' into ian/refine-task
amyangfei May 14, 2019
85bb39e
address comments
IANTHEREAL May 20, 2019
b7d2bdb
Merge branch 'master' into ian/refine-task
IANTHEREAL May 20, 2019
a379e25
*: add some logs and try to fix deadlock
IANTHEREAL May 21, 2019
d9fbda9
address comment
IANTHEREAL May 21, 2019
227a77c
Merge branch 'ian/refine-task' of /~https://github.com/GregoryIan/dm in…
IANTHEREAL May 21, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions dm/config/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,13 +330,13 @@ func (c *SubTaskConfig) DecryptPassword() (*SubTaskConfig, error) {
if len(clone.To.Password) > 0 {
pswdTo, err = utils.Decrypt(clone.To.Password)
if err != nil {
return nil, errors.Annotatef(err, "downstream DB")
return nil, errors.Annotatef(err, "downstream DB password %s", clone.To.Password)
}
}
if len(clone.From.Password) > 0 {
pswdFrom, err = utils.Decrypt(clone.From.Password)
if err != nil {
return nil, errors.Annotatef(err, "source DB")
return nil, errors.Annotatef(err, "source DB password %s", clone.From.Password)
}
}
clone.From.Password = pswdFrom
Expand Down
101 changes: 71 additions & 30 deletions dm/master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,14 +246,9 @@ func (s *Server) StartTask(ctx context.Context, req *pb.StartTaskRequest) (*pb.S
return
}
workerResp, err := cli.StartSubTask(ctx, &pb.StartSubTaskRequest{Task: stCfgToml})
if err != nil {
workerResp = &pb.CommonWorkerResponse{
Result: false,
Msg: errors.ErrorStack(err),
}
}
workerResp.Worker = worker
workerRespCh <- workerResp
workerResp = s.handleOperationResult(ctx, cli, stCfg.Name, err, workerResp)
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved
workerResp.Meta.Worker = worker
workerRespCh <- workerResp.Meta
}(stCfg)
}
wg.Wait()
Expand Down Expand Up @@ -318,23 +313,20 @@ func (s *Server) OperateTask(ctx context.Context, req *pb.OperateTaskRequest) (*
cli, ok := s.workerClients[worker]
if !ok {
workerResp := &pb.OperateSubTaskResponse{
Op: req.Op,
Result: false,
Worker: worker,
Msg: fmt.Sprintf("%s relevant worker-client not found", worker),
Meta: &pb.CommonWorkerResponse{
Result: false,
Worker: worker,
Msg: fmt.Sprintf("%s relevant worker-client not found", worker),
},
Op: req.Op,
}
workerRespCh <- workerResp
return
}
workerResp, err := cli.OperateSubTask(ctx, subReq)
if err != nil {
workerResp = &pb.OperateSubTaskResponse{
Result: false,
Msg: errors.ErrorStack(err),
}
}
workerResp = s.handleOperationResult(ctx, cli, req.Name, err, workerResp)
workerResp.Op = req.Op
workerResp.Worker = worker
workerResp.Meta.Worker = worker
workerRespCh <- workerResp
}(worker)
}
Expand All @@ -344,9 +336,9 @@ func (s *Server) OperateTask(ctx context.Context, req *pb.OperateTaskRequest) (*
workerRespMap := make(map[string]*pb.OperateSubTaskResponse, len(workers))
for len(workerRespCh) > 0 {
workerResp := <-workerRespCh
workerRespMap[workerResp.Worker] = workerResp
if len(workerResp.Msg) == 0 { // no error occurred
validWorkers = append(validWorkers, workerResp.Worker)
workerRespMap[workerResp.Meta.Worker] = workerResp
if len(workerResp.Meta.Msg) == 0 { // no error occurred
validWorkers = append(validWorkers, workerResp.Meta.Worker)
}
}

Expand Down Expand Up @@ -429,14 +421,9 @@ func (s *Server) UpdateTask(ctx context.Context, req *pb.UpdateTaskRequest) (*pb
return
}
workerResp, err := cli.UpdateSubTask(ctx, &pb.UpdateSubTaskRequest{Task: stCfgToml})
if err != nil {
workerResp = &pb.CommonWorkerResponse{
Result: false,
Msg: errors.ErrorStack(err),
}
}
workerResp.Worker = worker
workerRespCh <- workerResp
workerResp = s.handleOperationResult(ctx, cli, stCfg.Name, err, workerResp)
workerResp.Meta.Worker = worker
workerRespCh <- workerResp.Meta
}(stCfg)
}
wg.Wait()
Expand Down Expand Up @@ -1752,3 +1739,57 @@ func (s *Server) generateSubTask(ctx context.Context, task string) (*config.Task

return cfg, stCfgs, nil
}

var (
maxRetryNum = 30
retryInterval = time.Second
)

func (s *Server) waitOperationOk(ctx context.Context, cli pb.WorkerClient, name string, opLogID int64) error {
request := &pb.QueryTaskOperationRequest{
Name: name,
LogID: opLogID,
}

for num := 0; num < maxRetryNum; num++ {
res, err := cli.QueryTaskOperation(ctx, request)
if err != nil {
log.Errorf("fail to query task operation %v", err)
} else if res.Log.Success {
return nil
} else if len(res.Log.Message) != 0 {
return errors.New(res.Log.Message)
}

log.Infof("wait task %s op log %d, current result %+v", name, opLogID, res)
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(retryInterval):
}

}

return errors.New("request is timeout, but request may be successful")
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved
}

func (s *Server) handleOperationResult(ctx context.Context, cli pb.WorkerClient, name string, err error, response *pb.OperateSubTaskResponse) *pb.OperateSubTaskResponse {
if err != nil {
return &pb.OperateSubTaskResponse{
Meta: &pb.CommonWorkerResponse{
Result: false,
Msg: errors.ErrorStack(err),
},
}
}

err = s.waitOperationOk(ctx, cli, name, response.LogID)
if err != nil {
response.Meta = &pb.CommonWorkerResponse{
Result: false,
Msg: errors.ErrorStack(err),
}
}

return response
}
Loading