diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml new file mode 100644 index 0000000..5dc178a --- /dev/null +++ b/.github/workflows/build.yaml @@ -0,0 +1,31 @@ +name: Build +# Need GitHub secret: DOCKER_HUB_USER, DOCKER_HUB_SECRETS, GHCR_TOKEN + +on: + push: + branches: + - master + +jobs: + UnitTest: + name: Test + runs-on: ubuntu-20.04 + steps: + - name: Set up Go 1.19 + uses: actions/setup-go@v2.1.3 + with: + go-version: 1.19 + id: go + - name: Check out code into the Go module directory + uses: actions/checkout@v2.3.4 + - name: Test + run: | + make test + - name: Upload coverage to Codecov + uses: codecov/codecov-action@v1 + with: + token: ${{ secrets.CODECOV_TOKEN }} + files: coverage.out + flags: unittests + name: codecov-umbrella + fail_ci_if_error: true diff --git a/.github/workflows/pull-request.yaml b/.github/workflows/pull-request.yaml index 46b8e64..71ad9ee 100644 --- a/.github/workflows/pull-request.yaml +++ b/.github/workflows/pull-request.yaml @@ -10,10 +10,10 @@ jobs: name: Build runs-on: ubuntu-20.04 steps: - - name: Set up Go 1.16 + - name: Set up Go 1.19 uses: actions/setup-go@v3 with: - go-version: 1.16 + go-version: 1.19 id: go - name: Check out code into the Go module directory uses: actions/checkout@v3.0.0 @@ -33,3 +33,14 @@ jobs: - name: Test run: | make test + - name: Test + run: | + make test + - name: Upload coverage to Codecov + uses: codecov/codecov-action@v1 + with: + token: ${{ secrets.CODECOV_TOKEN }} + files: coverage.out + flags: unittests + name: codecov-umbrella + fail_ci_if_error: true diff --git a/Makefile b/Makefile index c69b60e..a6f4ac5 100644 --- a/Makefile +++ b/Makefile @@ -10,4 +10,4 @@ build-linux: build-all: build-darwin build-linux build-win test: - go test ./... + go test ./... -coverprofile coverage.out diff --git a/README.md b/README.md index ba93621..65bf942 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,6 @@ -Data transfer via UDP protocal. +[![codecov](https://codecov.io/gh/LinuxSuRen/transfer/branch/master/graph/badge.svg?token=XS8g2CjdNL)](https://codecov.io/gh/LinuxSuRen/transfer) + +Data transfer via UDP protocol. ## Features * Send files to an unknown target IP address in a local network @@ -22,4 +24,4 @@ transfer send targetFile [ip] ``` ## Limitations -* Not stable when sending data from MacOS +* Not fast enough (6.45 MB/s) when sending data from macOS diff --git a/main.go b/main.go index d259874..fb9dd00 100644 --- a/main.go +++ b/main.go @@ -27,7 +27,7 @@ func retry(count int, callback func() error) (err error) { break } // mainly do this on the darwin - time.Sleep(100 * time.Millisecond) + time.Sleep(500 * time.Millisecond) } return } diff --git a/queue.go b/queue.go new file mode 100644 index 0000000..986baab --- /dev/null +++ b/queue.go @@ -0,0 +1,28 @@ +package main + +import "sync" + +type Queue struct { + sync.Mutex + data []int +} + +func (q *Queue) Push(value int) { + q.Lock() + defer q.Unlock() + q.data = append(q.data, value) +} + +func (q *Queue) Pop() (item *int) { + q.Lock() + defer q.Unlock() + if len(q.data) > 0 { + item = &q.data[0] + q.data = q.data[1:] + } + return +} + +func (q *Queue) Size() int { + return len(q.data) +} diff --git a/queue_test.go b/queue_test.go new file mode 100644 index 0000000..f0bfc94 --- /dev/null +++ b/queue_test.go @@ -0,0 +1,43 @@ +package main + +import ( + "github.com/stretchr/testify/assert" + "sync" + "testing" +) + +func TestQueue(t *testing.T) { + queue := &Queue{} + + val := queue.Pop() + assert.Nil(t, val) + + queue.Push(1) + val = queue.Pop() + assert.NotNil(t, val) + assert.Equal(t, 1, *val) + + // multi-threads case + wg := sync.WaitGroup{} + threadCounts := 1000 + for i := 0; i < threadCounts; i++ { + wg.Add(1) + go func() { + defer wg.Done() + queue.Push(1) + }() + } + wg.Wait() + assert.Equal(t, threadCounts, queue.Size()) + + // Pop case + for i := 0; i < threadCounts; i++ { + wg.Add(1) + go func() { + defer wg.Done() + queue.Pop() + }() + } + wg.Wait() + assert.Equal(t, 0, queue.Size()) +} diff --git a/safe_map.go b/safe_map.go new file mode 100644 index 0000000..cdd8572 --- /dev/null +++ b/safe_map.go @@ -0,0 +1,79 @@ +package main + +import "sync" + +type SafeMap struct { + sync.Mutex + data map[int]string +} + +func NewSafeMap(count int) (safeMap *SafeMap) { + safeMap = &SafeMap{data: make(map[int]string, count)} + for i := 0; i < count; i++ { + safeMap.Put(i, "") + } + return +} + +func (m *SafeMap) Put(k int, val string) { + m.Lock() + defer m.Unlock() + m.data[k] = val +} + +func (m *SafeMap) Remove(k int) { + m.Lock() + defer m.Unlock() + delete(m.data, k) +} + +func (m *SafeMap) Get() *int { + m.Lock() + defer m.Unlock() + for k, _ := range m.data { + return &k + } + return nil +} + +func (m *SafeMap) GetKeys() (result []int) { + m.Lock() + defer m.Unlock() + for k, _ := range m.data { + result = append(result, k) + } + return +} + +func (m *SafeMap) GetAndRemove() (target *int) { + m.Lock() + defer m.Unlock() + for k, _ := range m.data { + target = &k + break + } + if target != nil { + delete(m.data, *target) + } + return +} + +func (m *SafeMap) GetLowestAndRemove() (target *int) { + m.Lock() + defer m.Unlock() + for k := range m.data { + if target == nil || k < *target { + target = &k + } + } + if target != nil { + delete(m.data, *target) + } + return +} + +func (m *SafeMap) Size() int { + m.Lock() + defer m.Unlock() + return len(m.data) +} diff --git a/safe_map_test.go b/safe_map_test.go new file mode 100644 index 0000000..f263618 --- /dev/null +++ b/safe_map_test.go @@ -0,0 +1,35 @@ +package main + +import ( + "github.com/stretchr/testify/assert" + "sync" + "testing" +) + +func TestSafeMap(t *testing.T) { + safeMap := NewSafeMap(1000) + assert.NotNil(t, safeMap) + assert.Equal(t, 1000, safeMap.Size()) + + wg := sync.WaitGroup{} + for i := 0; i < 500; i++ { + wg.Add(1) + go func(k int) { + defer wg.Done() + + safeMap.Remove(k) + }(i) + } + wg.Wait() + assert.Equal(t, 500, safeMap.Size()) + + for i := 0; i < 500; i++ { + wg.Add(1) + go func() { + defer wg.Done() + _ = safeMap.GetAndRemove() + }() + } + wg.Wait() + assert.Equal(t, 0, safeMap.Size()) +} diff --git a/send.go b/send.go index 7584bb6..adf38a8 100644 --- a/send.go +++ b/send.go @@ -10,6 +10,8 @@ import ( "regexp" "strconv" "strings" + "sync" + "sync/atomic" "time" ) @@ -98,7 +100,7 @@ func (o *sendOption) runE(cmd *cobra.Command, args []string) (err error) { return } - err = retry(10, func() error { + err = retry(30, func() error { // no buffer space available might happen on darwin _, err := conn.Write(builder.CreateHeader(i, buf[:n])) return err @@ -111,47 +113,54 @@ func (o *sendOption) runE(cmd *cobra.Command, args []string) (err error) { } cmd.Println("all the data was sent, try to wait for the missing data") - checking := true - go func(checking *bool) { + mapBuffer := NewSafeMap(0) + ck := atomic.Bool{} + ck.Store(true) + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() cmd.Print("checking") - for *checking { - select { - case <-time.After(time.Second * 5): - cmd.Print(".") + + for index := mapBuffer.GetLowestAndRemove(); ck.Load(); index = mapBuffer.GetLowestAndRemove() { + if index != nil { + //fmt.Println("send", *index) + _ = send(f, reader, conn, *index, chunk, builder) + } else { + fmt.Print(".") + time.Sleep(time.Second * 3) } } - cmd.Println() - }(&checking) + fmt.Println() + }() - for checking { + for ck.Load() { var index int var ok bool if index, ok, err = waitingMissing(conn); ok { if index == -1 { - checking = false + ck.Store(false) } else { - if err = send(f, reader, conn, index, chunk, builder); err != nil { - return err - } + //fmt.Println("got missing", index) + mapBuffer.Put(index, "") } } else if err != nil { + fmt.Println(err) if match, _ := regexp.MatchString(".*connection refused.*", err.Error()); match { + time.Sleep(time.Second * 2) + if conn, err = net.Dial("udp", fmt.Sprintf("%s:%d", o.ip, o.port)); err != nil { + fmt.Println(err) return } - } else if match, _ := regexp.MatchString(".*i/o timeout.*", err.Error()); match { - if conn, err = net.Dial("udp", fmt.Sprintf("%s:%d", o.ip, o.port)); err != nil { - return + if err = send(f, reader, conn, 0, chunk, builder); err != nil { + return err } } - - time.Sleep(time.Second * 2) - if err = send(f, reader, conn, 0, chunk, builder); err != nil { - return err - } } } + wg.Wait() endTime := time.Now() cmd.Println("sent over with", endTime.Sub(beginTime).Seconds()) return @@ -168,7 +177,7 @@ func send(f *os.File, reader *bufio.Reader, conn net.Conn, index, chunk int, bui return } - _ = retry(10, func() error { + err = retry(30, func() error { _, err := conn.Write(builder.CreateHeader(index, buf[:n])) return err }) @@ -182,9 +191,9 @@ func waitingMissing(conn net.Conn) (index int, ok bool, err error) { message := make([]byte, 14) var rlen int - if err = conn.SetReadDeadline(time.Now().Add(time.Second * 3)); err != nil { - return - } + //if err = conn.SetReadDeadline(time.Now().Add(time.Second * 3)); err != nil { + // return + //} if rlen, err = conn.Read(message[:]); err == nil && rlen == 14 { index, ok = checkMissing(message) diff --git a/wait.go b/wait.go index addafe5..927f0c5 100644 --- a/wait.go +++ b/wait.go @@ -70,10 +70,12 @@ func broadcast(ctx context.Context, ip net.IP) { } func (o *waitOption) runE(cmd *cobra.Command, args []string) error { - conn, err := net.ListenUDP("udp", &net.UDPAddr{ + udpAddress := &net.UDPAddr{ Port: o.port, IP: net.ParseIP(o.listen), - }) + } + + conn, err := net.ListenUDP("udp", udpAddress) if err != nil { return err } @@ -94,72 +96,81 @@ func (o *waitOption) runE(cmd *cobra.Command, args []string) error { if err != nil { return err } + defer func() { + _ = f.Close() + }() if _, err = f.Write(make([]byte, header.length)); err != nil { err = fmt.Errorf("failed to init file, %v", err) return err } - buffer := make([]byte, header.count) - buffer[header.index] = 1 - f.WriteAt(header.data, int64(header.chrunk*header.index)) + mapBuffer := NewSafeMap(header.count) + go func() { + if _, err := f.WriteAt(header.data, int64(header.chrunk*header.index)); err == nil { + mapBuffer.Remove(header.index) + } + }() wg := sync.WaitGroup{} wg.Add(1) go func() { defer wg.Done() - checking := true - for checking { - done := true - for i := 0; i < header.count; i++ { - if buffer[i] != 1 { - done = false - - header, err := readHeader(conn) + //startedMissingThread := false + for size := mapBuffer.Size(); size > 0; size = mapBuffer.Size() { + header, err = readHeader(conn) + if err == nil { + go func(header dataHeader) { + _, err = f.WriteAt(header.data, int64(header.chrunk*header.index)) if err == nil { - _, err = f.WriteAt(header.data, int64(header.chrunk*header.index)) - if err == nil { - buffer[header.index] = 1 - } + mapBuffer.Remove(header.index) + } else { + fmt.Println(err) } - } - } - if done { - checking = false + }(header) } } }() + // check the buffer and start the missing thread + lastCount := 0 + time.Sleep(time.Microsecond * time.Duration(lastCount) * 50) + for lastCount != mapBuffer.Size() { + lastCount = mapBuffer.Size() + time.Sleep(time.Second * 5) + } + sendWaitingMissingRequest(&wg, &header, mapBuffer, conn) + + wg.Wait() + cmd.Println("wrote to file", f.Name()) + return nil +} + +func sendWaitingMissingRequest(wg *sync.WaitGroup, header *dataHeader, buffer *SafeMap, conn *net.UDPConn) { wg.Add(1) go func() { - defer wg.Done() - - checking := true - for checking { - done := true - for i := 0; i < header.count; i++ { - if buffer[i] != 1 { - done = false - requestMissing(conn, i, header.remote) - time.Sleep(time.Millisecond * 10) - break + defer func() { + _ = conn.Close() + wg.Done() + }() + + for buffer.Size() > 0 { + missing := buffer.GetKeys() + //fmt.Println("missing", len(missing)) + for _, i := range missing { + err := requestMissing(conn, i, header.remote) + if err != nil { + fmt.Println(err) } } + time.Sleep(time.Second) + } - if done { - requestDone(conn, header.remote) - checking = false - } + for err := requestDone(conn, header.remote); err != nil; { } fmt.Println("done with checking") }() - - wg.Wait() - f.Close() - - cmd.Println("wrote to file", f.Name()) - return nil } func requestDone(conn *net.UDPConn, remote *net.UDPAddr) (err error) {