Skip to content

Commit

Permalink
refactor(batch): commonize delete and restore
Browse files Browse the repository at this point in the history
  • Loading branch information
shuntaka9576 committed Jun 5, 2022
1 parent f1a643e commit 2817080
Show file tree
Hide file tree
Showing 10 changed files with 243 additions and 232 deletions.
120 changes: 53 additions & 67 deletions batch.go
Original file line number Diff line number Diff line change
@@ -1,117 +1,103 @@
package ddbrew

import (
"fmt"
"context"
"os"
"runtime"
"sync/atomic"
"time"
)

type BatchWriteConsoleOpt struct {
Table *Table
File *os.File
DDBAction DDBAction
LimitUnit *int
type BatchWriter struct {
Table *Table
File *os.File
DDBAction DDBAction
LimitUnit *int
RemainCount *int64
Results chan *BatchResult
Done chan struct{}
}

type BatchResult struct {
Content *BatchWriteOutput
Content BatchWriteOutput
Error error
}

func batchWriteWithConsle(opt *BatchWriteConsoleOpt) error {
func (b *BatchWriter) BatchWrite(ctx context.Context) error {
generator := BatchRequestGenerator{}
generator.Init(&BatchRequestGeneratorOption{
File: opt.File,
Action: opt.DDBAction,
Table: opt.Table,
File: b.File,
Action: b.DDBAction,
Table: b.Table,
})

results := make(chan *BatchResult)
done := make(chan struct{})
reqs := make(chan BatchRequest)

procs := runtime.NumCPU()

for i := 0; i < procs; i++ {
go worker(reqs, results)
go worker(ctx, reqs, b.Results)
}

var remainCount int64 = 0

if opt.LimitUnit == nil {
if b.LimitUnit == nil {
go func() {
LOOP:
for {
batchReq, err := generator.generate(0)
if err != nil {
if batchReq.Number() == 0 && err == ErrBatchEOF {
close(done)
select {
case <-ctx.Done():
break LOOP
default:
batchReq, err := generator.generate(0)
if err != nil {
if batchReq.Number() == 0 && err == ErrBatchEOF {
close(b.Done)

break
break LOOP
}
}
reqs <- batchReq
atomic.AddInt64(b.RemainCount, 1)
}
reqs <- batchReq
atomic.AddInt64(&remainCount, 1)
}
}()
} else {
go func() {
ticker := time.NewTicker(1 * time.Second)

LOOP:
for {
<-ticker.C
limit := *opt.LimitUnit
select {
case <-ctx.Done():
break LOOP
case <-ticker.C:
limit := *b.LimitUnit

for {
reqUnitSize := DEFAULT_UNIT_SIZE
if limit < reqUnitSize {
reqUnitSize = limit
}

for {
reqUnitSize := DEFAULT_UNIT_SIZE
if limit < reqUnitSize {
reqUnitSize = limit
}
batchReq, err := generator.generate(reqUnitSize)
if err != nil {
if batchReq.Number() == 0 && err == ErrBatchEOF {
close(b.Done)

batchReq, err := generator.generate(reqUnitSize)
if err != nil {
if batchReq.Number() == 0 && err == ErrBatchEOF {
close(done)
break LOOP
}
}
reqs <- batchReq
atomic.AddInt64(b.RemainCount, 1)

limit -= batchReq.totalWU
if limit == 0 {
break
}
}
reqs <- batchReq
atomic.AddInt64(&remainCount, 1)

limit -= batchReq.totalWU
if limit == 0 {
break
}
}
}
}()
}

successNum, unprocessedNum := 0, 0
for {
select {
case result := <-results:
if result != nil {
atomic.AddInt64(&remainCount, -1)

if result.Content != nil {
successNum += result.Content.SuccessCount
unprocessedNum += len(result.Content.UnprocessedRecord)

fmt.Printf("\r%d, %d", successNum, unprocessedNum)
}

if result.Error != nil {
return result.Error
}
}
case <-done:
if remainCount == 0 {
return nil
}
}
}

return nil
}
51 changes: 0 additions & 51 deletions cli/delete.go

This file was deleted.

27 changes: 27 additions & 0 deletions cli/file.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package cli

import (
"bufio"
"io"
"os"
)

func countLines(f *os.File) (lines int) {
defer f.Seek(0, 0)

re := bufio.NewReader(f)

for {
_, err := re.ReadString('\n')
if err != nil {
if err == io.EOF {

break
}
}
lines += 1
}

return lines

}
44 changes: 0 additions & 44 deletions cli/restore.go

This file was deleted.

Loading

0 comments on commit 2817080

Please sign in to comment.