From 2817080a50a5d7b796dad0d8afcbd3c44b27cca6 Mon Sep 17 00:00:00 2001 From: shuntaka9576 Date: Sun, 5 Jun 2022 07:43:54 +0900 Subject: [PATCH] refactor(batch): commonize delete and restore --- batch.go | 120 ++++++++++++++++++---------------------- cli/delete.go | 51 ----------------- cli/file.go | 27 +++++++++ cli/restore.go | 44 --------------- cli/write.go | 135 +++++++++++++++++++++++++++++++++++++++++++++ client.go | 46 ++++++++------- cmd/ddbrew/main.go | 6 +- delete.go | 21 ------- restore.go | 21 ------- worker.go | 4 +- 10 files changed, 243 insertions(+), 232 deletions(-) delete mode 100644 cli/delete.go create mode 100644 cli/file.go delete mode 100644 cli/restore.go create mode 100644 cli/write.go delete mode 100644 delete.go delete mode 100644 restore.go diff --git a/batch.go b/batch.go index 7229de3..92a3989 100644 --- a/batch.go +++ b/batch.go @@ -1,117 +1,103 @@ package ddbrew import ( - "fmt" + "context" "os" "runtime" "sync/atomic" "time" ) -type BatchWriteConsoleOpt struct { - Table *Table - File *os.File - DDBAction DDBAction - LimitUnit *int +type BatchWriter struct { + Table *Table + File *os.File + DDBAction DDBAction + LimitUnit *int + RemainCount *int64 + Results chan *BatchResult + Done chan struct{} } type BatchResult struct { - Content *BatchWriteOutput + Content BatchWriteOutput Error error } -func batchWriteWithConsle(opt *BatchWriteConsoleOpt) error { +func (b *BatchWriter) BatchWrite(ctx context.Context) error { generator := BatchRequestGenerator{} generator.Init(&BatchRequestGeneratorOption{ - File: opt.File, - Action: opt.DDBAction, - Table: opt.Table, + File: b.File, + Action: b.DDBAction, + Table: b.Table, }) - results := make(chan *BatchResult) - done := make(chan struct{}) reqs := make(chan BatchRequest) procs := runtime.NumCPU() for i := 0; i < procs; i++ { - go worker(reqs, results) + go worker(ctx, reqs, b.Results) } - var remainCount int64 = 0 - - if opt.LimitUnit == nil { + if b.LimitUnit == nil { go func() { + LOOP: for { - batchReq, err := generator.generate(0) - if err != nil { - if batchReq.Number() == 0 && err == ErrBatchEOF { - close(done) + select { + case <-ctx.Done(): + break LOOP + default: + batchReq, err := generator.generate(0) + if err != nil { + if batchReq.Number() == 0 && err == ErrBatchEOF { + close(b.Done) - break + break LOOP + } } + reqs <- batchReq + atomic.AddInt64(b.RemainCount, 1) } - reqs <- batchReq - atomic.AddInt64(&remainCount, 1) } }() } else { go func() { ticker := time.NewTicker(1 * time.Second) + LOOP: for { - <-ticker.C - limit := *opt.LimitUnit + select { + case <-ctx.Done(): + break LOOP + case <-ticker.C: + limit := *b.LimitUnit + + for { + reqUnitSize := DEFAULT_UNIT_SIZE + if limit < reqUnitSize { + reqUnitSize = limit + } - for { - reqUnitSize := DEFAULT_UNIT_SIZE - if limit < reqUnitSize { - reqUnitSize = limit - } + batchReq, err := generator.generate(reqUnitSize) + if err != nil { + if batchReq.Number() == 0 && err == ErrBatchEOF { + close(b.Done) - batchReq, err := generator.generate(reqUnitSize) - if err != nil { - if batchReq.Number() == 0 && err == ErrBatchEOF { - close(done) + break LOOP + } + } + reqs <- batchReq + atomic.AddInt64(b.RemainCount, 1) + limit -= batchReq.totalWU + if limit == 0 { break } } - reqs <- batchReq - atomic.AddInt64(&remainCount, 1) - - limit -= batchReq.totalWU - if limit == 0 { - break - } } } }() } - successNum, unprocessedNum := 0, 0 - for { - select { - case result := <-results: - if result != nil { - atomic.AddInt64(&remainCount, -1) - - if result.Content != nil { - successNum += result.Content.SuccessCount - unprocessedNum += len(result.Content.UnprocessedRecord) - - fmt.Printf("\r%d, %d", successNum, unprocessedNum) - } - - if result.Error != nil { - return result.Error - } - } - case <-done: - if remainCount == 0 { - return nil - } - } - } - + return nil } diff --git a/cli/delete.go b/cli/delete.go deleted file mode 100644 index 34524fd..0000000 --- a/cli/delete.go +++ /dev/null @@ -1,51 +0,0 @@ -package cli - -import ( - "context" - "os" - - "github.com/aws/aws-sdk-go-v2/service/dynamodb" - "github.com/pkg/errors" - "github.com/shuntaka9576/ddbrew" -) - -type DeleteOption struct { - TableName string - FilePath string - DryRun bool - Limit int -} - -func Delete(ctx context.Context, opt *DeleteOption) error { - var f *os.File - f, err := os.Open(opt.FilePath) - if err != nil { - return err - } - - tinfo, err := ddbrew.DdbClient.DescribeTable(ctx, &dynamodb.DescribeTableInput{ - TableName: &opt.TableName, - }) - if err != nil { - return errors.Wrap(ErrorDescribeTable, err.Error()) - } - table := &ddbrew.Table{} - table.Init(tinfo) - - var limitUnit *int = nil - if opt.Limit > 0 { - limitUnit = &opt.Limit - } - - err = ddbrew.Delete(ctx, &ddbrew.DeleteOption{ - Table: table, - File: f, - LimitUnit: limitUnit, - }) - - if err != nil { - return err - } - - return nil -} diff --git a/cli/file.go b/cli/file.go new file mode 100644 index 0000000..540b731 --- /dev/null +++ b/cli/file.go @@ -0,0 +1,27 @@ +package cli + +import ( + "bufio" + "io" + "os" +) + +func countLines(f *os.File) (lines int) { + defer f.Seek(0, 0) + + re := bufio.NewReader(f) + + for { + _, err := re.ReadString('\n') + if err != nil { + if err == io.EOF { + + break + } + } + lines += 1 + } + + return lines + +} diff --git a/cli/restore.go b/cli/restore.go deleted file mode 100644 index 76ef2ce..0000000 --- a/cli/restore.go +++ /dev/null @@ -1,44 +0,0 @@ -package cli - -import ( - "context" - "os" - - "github.com/shuntaka9576/ddbrew" -) - -type RestoreOption struct { - TableName string - FilePath string - DryRun bool - Limit int -} - -func Restore(ctx context.Context, opt *RestoreOption) error { - var f *os.File - f, err := os.Open(opt.FilePath) - if err != nil { - return err - } - - table := ddbrew.Table{ - Name: opt.TableName, - } - - var limitUnit *int = nil - if opt.Limit > 0 { - limitUnit = &opt.Limit - } - - err = ddbrew.Restore(ctx, &ddbrew.RestoreOption{ - Table: &table, - File: f, - LimitUnit: limitUnit, - }) - - if err != nil { - return err - } - - return nil -} diff --git a/cli/write.go b/cli/write.go new file mode 100644 index 0000000..cbd78be --- /dev/null +++ b/cli/write.go @@ -0,0 +1,135 @@ +package cli + +import ( + "context" + "fmt" + "os" + "sync/atomic" + "time" + + "github.com/aws/aws-sdk-go-v2/service/dynamodb" + "github.com/pkg/errors" + "github.com/shuntaka9576/ddbrew" +) + +type WriteOption struct { + TableName string + FilePath string + DryRun bool + Limit int + Action ddbrew.DDBAction +} + +func Write(ctx context.Context, opt *WriteOption) error { + f, err := os.Open(opt.FilePath) + if err != nil { + return err + } + lines := countLines(f) + + info, err := ddbrew.DdbClient.DescribeTable(ctx, &dynamodb.DescribeTableInput{ + TableName: &opt.TableName, + }) + if err != nil { + return errors.Wrap(ErrorDescribeTable, err.Error()) + } + + table := ddbrew.Table{} + table.Init(info) + + if opt.DryRun { + result, err := ddbrew.Simulate(&ddbrew.SimulateOpt{Reader: f, Mode: table.Mode}) + if err != nil { + return err + } + + size := ddbrew.PrittyPrintBytes(result.TotalItemSize) + fmt.Printf("Total item size: %s\n", *size) + + switch table.Mode { + case ddbrew.Provisioned: + fmt.Printf("Total to consume: %d WCU\n", *result.ConsumeWCU) + case ddbrew.OnDemand: + fmt.Printf("Total to consume: %d WRU\n", *result.ConsumeWRU) + } + + return nil + } + + var limitUnit *int = nil + if opt.Limit > 0 { + limitUnit = &opt.Limit + } + + results := make(chan *ddbrew.BatchResult) + done := make(chan struct{}) + remainCount := int64(0) + + writer := ddbrew.BatchWriter{ + Table: &table, + File: f, + DDBAction: opt.Action, + LimitUnit: limitUnit, + RemainCount: &remainCount, + Results: results, + Done: done, + } + + err = writer.BatchWrite(ctx) + if err != nil { + return err + } + + successNum, unprocessedNum := 0, 0 + var unprocessedRecordFile *os.File + isUnprocessed := false + + for { + select { + case result := <-results: + atomic.AddInt64(&remainCount, -1) + + successNum += result.Content.SuccessCount + unprocessedNum += len(result.Content.UnprocessedRecord) + progress := int(float64(successNum) / float64(lines) * 100) + + if !isUnprocessed && len(result.Content.UnprocessedRecord) > 0 { + isUnprocessed = true + + ufile := fmt.Sprintf("unprocessed_record_%s_%s.jsonl", + opt.TableName, + time.Now().Format("20060102-150405")) + + unprocessedRecordFile, err = os.Create(ufile) + if err != nil { + return err + } + defer unprocessedRecordFile.Close() + } + + if len(result.Content.UnprocessedRecord) > 0 { + for _, record := range result.Content.UnprocessedRecord { + unprocessedRecordFile.Write([]byte(record + "\n")) + } + } + + if isUnprocessed { + fmt.Fprintf(os.Stderr, "\rSuccess: %d(%d%%) Unprocessed(%s): %d", + successNum, + progress, + unprocessedRecordFile.Name(), + unprocessedNum) + } else { + fmt.Fprintf(os.Stderr, "\rSuccess: %d(%d%%)", successNum, progress) + } + + if result.Error != nil { + return result.Error + } + case <-done: + if remainCount == 0 { + return nil + } + } + } +} diff --git a/client.go b/client.go index cf10cc1..a9bf4de 100644 --- a/client.go +++ b/client.go @@ -65,44 +65,42 @@ type BatchWriteOutput struct { UnprocessedRecord []string } -func (d *DDBClient) BatchWrite(ctx context.Context, req BatchRequest) (*BatchWriteOutput, error) { +func (d *DDBClient) BatchWrite(ctx context.Context, req BatchRequest) (output BatchWriteOutput, err error) { res, err := d.BatchWriteItem(ctx, req.BatchWriteItemInput()) if err != nil { - return nil, err + return output, err } - result := &BatchWriteOutput{} - if err == nil { - if res != nil && len(res.UnprocessedItems[req.TableName]) > 0 { - for _, item := range res.UnprocessedItems[req.TableName] { - parsedJl := map[string]interface{}{} - - if item.PutRequest != nil { - err = attributevalue.UnmarshalMap(item.PutRequest.Item, &parsedJl) - if err != nil { - continue - } + if len(res.UnprocessedItems[req.TableName]) > 0 { + for _, item := range res.UnprocessedItems[req.TableName] { + parsedJl := map[string]interface{}{} + if item.PutRequest != nil { + err = attributevalue.UnmarshalMap(item.PutRequest.Item, &parsedJl) + if err != nil { + continue } - if item.DeleteRequest != nil { - err = attributevalue.UnmarshalMap(item.DeleteRequest.Key, &parsedJl) - if err != nil { - continue - } - } + } - jsonByte, err := json.Marshal(parsedJl) + if item.DeleteRequest != nil { + err = attributevalue.UnmarshalMap(item.DeleteRequest.Key, &parsedJl) if err != nil { - return nil, err + continue } + } - result.UnprocessedRecord = append(result.UnprocessedRecord, string(jsonByte)) + jsonByte, err := json.Marshal(parsedJl) + if err != nil { + return output, err } + + output.UnprocessedRecord = append(output.UnprocessedRecord, string(jsonByte)) } } - result.SuccessCount = req.Number() - len(result.UnprocessedRecord) - return result, nil + output.SuccessCount = req.Number() - len(output.UnprocessedRecord) + + return output, nil } diff --git a/cmd/ddbrew/main.go b/cmd/ddbrew/main.go index 5f5cb21..6967bc1 100644 --- a/cmd/ddbrew/main.go +++ b/cmd/ddbrew/main.go @@ -60,18 +60,20 @@ func main() { Limit: CLI.Backup.Limit, }) case "restore ": - cmdErrCh <- cli.Restore(ctx, &cli.RestoreOption{ + cmdErrCh <- cli.Write(ctx, &cli.WriteOption{ TableName: CLI.Restore.TableName, FilePath: CLI.Restore.File, DryRun: CLI.Restore.DryRun, Limit: CLI.Restore.Limit, + Action: ddbrew.DDB_ACTION_PUT, }) case "delete ": - cmdErrCh <- cli.Delete(ctx, &cli.DeleteOption{ + cmdErrCh <- cli.Write(ctx, &cli.WriteOption{ TableName: CLI.Delete.TableName, FilePath: CLI.Delete.File, DryRun: CLI.Delete.DryRun, Limit: CLI.Delete.Limit, + Action: ddbrew.DDB_ACTION_DELETE, }) } }() diff --git a/delete.go b/delete.go deleted file mode 100644 index f45f382..0000000 --- a/delete.go +++ /dev/null @@ -1,21 +0,0 @@ -package ddbrew - -import ( - "context" - "os" -) - -type DeleteOption struct { - Table *Table - File *os.File - LimitUnit *int -} - -func Delete(ctx context.Context, opt *DeleteOption) error { - return batchWriteWithConsle(&BatchWriteConsoleOpt{ - Table: opt.Table, - File: opt.File, - DDBAction: DDB_ACTION_DELETE, - LimitUnit: opt.LimitUnit, - }) -} diff --git a/restore.go b/restore.go deleted file mode 100644 index ee1e889..0000000 --- a/restore.go +++ /dev/null @@ -1,21 +0,0 @@ -package ddbrew - -import ( - "context" - "os" -) - -type RestoreOption struct { - Table *Table - File *os.File - LimitUnit *int -} - -func Restore(ctx context.Context, opt *RestoreOption) error { - return batchWriteWithConsle(&BatchWriteConsoleOpt{ - Table: opt.Table, - File: opt.File, - DDBAction: DDB_ACTION_PUT, - LimitUnit: opt.LimitUnit, - }) -} diff --git a/worker.go b/worker.go index aaa05e5..e52cd74 100644 --- a/worker.go +++ b/worker.go @@ -4,9 +4,9 @@ import ( "context" ) -func worker(reqs <-chan BatchRequest, results chan<- *BatchResult) { +func worker(ctx context.Context, reqs <-chan BatchRequest, results chan<- *BatchResult) { for req := range reqs { - res, err := DdbClient.BatchWrite(context.TODO(), req) + res, err := DdbClient.BatchWrite(ctx, req) results <- &BatchResult{ Content: res,