Skip to content

Commit

Permalink
Improve the performance when sending on macOS
Browse files Browse the repository at this point in the history
  • Loading branch information
LinuxSuRen committed Sep 4, 2022
1 parent cb262e1 commit 18ff216
Show file tree
Hide file tree
Showing 11 changed files with 323 additions and 74 deletions.
31 changes: 31 additions & 0 deletions .github/workflows/build.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
name: Build
# Need GitHub secret: DOCKER_HUB_USER, DOCKER_HUB_SECRETS, GHCR_TOKEN

on:
push:
branches:
- master

jobs:
UnitTest:
name: Test
runs-on: ubuntu-20.04
steps:
- name: Set up Go 1.19
uses: actions/setup-go@v2.1.3
with:
go-version: 1.19
id: go
- name: Check out code into the Go module directory
uses: actions/checkout@v2.3.4
- name: Test
run: |
make test
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v1
with:
token: ${{ secrets.CODECOV_TOKEN }}
files: coverage.out
flags: unittests
name: codecov-umbrella
fail_ci_if_error: true
15 changes: 13 additions & 2 deletions .github/workflows/pull-request.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ jobs:
name: Build
runs-on: ubuntu-20.04
steps:
- name: Set up Go 1.16
- name: Set up Go 1.19
uses: actions/setup-go@v3
with:
go-version: 1.16
go-version: 1.19
id: go
- name: Check out code into the Go module directory
uses: actions/checkout@v3.0.0
Expand All @@ -33,3 +33,14 @@ jobs:
- name: Test
run: |
make test
- name: Test
run: |
make test
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v1
with:
token: ${{ secrets.CODECOV_TOKEN }}
files: coverage.out
flags: unittests
name: codecov-umbrella
fail_ci_if_error: true
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ build-linux:
build-all: build-darwin build-linux build-win

test:
go test ./...
go test ./... -coverprofile coverage.out
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
Data transfer via UDP protocal.
[![codecov](https://codecov.io/gh/LinuxSuRen/transfer/branch/master/graph/badge.svg?token=XS8g2CjdNL)](https://codecov.io/gh/LinuxSuRen/transfer)

Data transfer via UDP protocol.

## Features
* Send files to an unknown target IP address in a local network
Expand All @@ -22,4 +24,4 @@ transfer send targetFile [ip]
```

## Limitations
* Not stable when sending data from MacOS
* Not fast enough (6.45 MB/s) when sending data from macOS
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func retry(count int, callback func() error) (err error) {
break
}
// mainly do this on the darwin
time.Sleep(100 * time.Millisecond)
time.Sleep(500 * time.Millisecond)
}
return
}
Expand Down
28 changes: 28 additions & 0 deletions queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package main

import "sync"

type Queue struct {
sync.Mutex
data []int
}

func (q *Queue) Push(value int) {
q.Lock()
defer q.Unlock()
q.data = append(q.data, value)
}

func (q *Queue) Pop() (item *int) {
q.Lock()
defer q.Unlock()
if len(q.data) > 0 {
item = &q.data[0]
q.data = q.data[1:]
}
return
}

func (q *Queue) Size() int {
return len(q.data)
}
43 changes: 43 additions & 0 deletions queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package main

import (
"github.com/stretchr/testify/assert"
"sync"
"testing"
)

func TestQueue(t *testing.T) {
queue := &Queue{}

val := queue.Pop()
assert.Nil(t, val)

queue.Push(1)
val = queue.Pop()
assert.NotNil(t, val)
assert.Equal(t, 1, *val)

// multi-threads case
wg := sync.WaitGroup{}
threadCounts := 1000
for i := 0; i < threadCounts; i++ {
wg.Add(1)
go func() {
defer wg.Done()
queue.Push(1)
}()
}
wg.Wait()
assert.Equal(t, threadCounts, queue.Size())

// Pop case
for i := 0; i < threadCounts; i++ {
wg.Add(1)
go func() {
defer wg.Done()
queue.Pop()
}()
}
wg.Wait()
assert.Equal(t, 0, queue.Size())
}
79 changes: 79 additions & 0 deletions safe_map.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package main

import "sync"

type SafeMap struct {
sync.Mutex
data map[int]string
}

func NewSafeMap(count int) (safeMap *SafeMap) {
safeMap = &SafeMap{data: make(map[int]string, count)}
for i := 0; i < count; i++ {
safeMap.Put(i, "")
}
return
}

func (m *SafeMap) Put(k int, val string) {
m.Lock()
defer m.Unlock()
m.data[k] = val
}

func (m *SafeMap) Remove(k int) {
m.Lock()
defer m.Unlock()
delete(m.data, k)
}

func (m *SafeMap) Get() *int {
m.Lock()
defer m.Unlock()
for k, _ := range m.data {
return &k
}
return nil
}

func (m *SafeMap) GetKeys() (result []int) {
m.Lock()
defer m.Unlock()
for k, _ := range m.data {
result = append(result, k)
}
return
}

func (m *SafeMap) GetAndRemove() (target *int) {
m.Lock()
defer m.Unlock()
for k, _ := range m.data {
target = &k
break
}
if target != nil {
delete(m.data, *target)
}
return
}

func (m *SafeMap) GetLowestAndRemove() (target *int) {
m.Lock()
defer m.Unlock()
for k := range m.data {
if target == nil || k < *target {
target = &k
}
}
if target != nil {
delete(m.data, *target)
}
return
}

func (m *SafeMap) Size() int {
m.Lock()
defer m.Unlock()
return len(m.data)
}
35 changes: 35 additions & 0 deletions safe_map_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package main

import (
"github.com/stretchr/testify/assert"
"sync"
"testing"
)

func TestSafeMap(t *testing.T) {
safeMap := NewSafeMap(1000)
assert.NotNil(t, safeMap)
assert.Equal(t, 1000, safeMap.Size())

wg := sync.WaitGroup{}
for i := 0; i < 500; i++ {
wg.Add(1)
go func(k int) {
defer wg.Done()

safeMap.Remove(k)
}(i)
}
wg.Wait()
assert.Equal(t, 500, safeMap.Size())

for i := 0; i < 500; i++ {
wg.Add(1)
go func() {
defer wg.Done()
_ = safeMap.GetAndRemove()
}()
}
wg.Wait()
assert.Equal(t, 0, safeMap.Size())
}
61 changes: 35 additions & 26 deletions send.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"regexp"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
)

Expand Down Expand Up @@ -98,7 +100,7 @@ func (o *sendOption) runE(cmd *cobra.Command, args []string) (err error) {
return
}

err = retry(10, func() error {
err = retry(30, func() error {
// no buffer space available might happen on darwin
_, err := conn.Write(builder.CreateHeader(i, buf[:n]))
return err
Expand All @@ -111,47 +113,54 @@ func (o *sendOption) runE(cmd *cobra.Command, args []string) (err error) {
}
cmd.Println("all the data was sent, try to wait for the missing data")

checking := true
go func(checking *bool) {
mapBuffer := NewSafeMap(0)
ck := atomic.Bool{}
ck.Store(true)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
cmd.Print("checking")
for *checking {
select {
case <-time.After(time.Second * 5):
cmd.Print(".")

for index := mapBuffer.GetLowestAndRemove(); ck.Load(); index = mapBuffer.GetLowestAndRemove() {
if index != nil {
//fmt.Println("send", *index)
_ = send(f, reader, conn, *index, chunk, builder)
} else {
fmt.Print(".")
time.Sleep(time.Second * 3)
}
}
cmd.Println()
}(&checking)
fmt.Println()
}()

for checking {
for ck.Load() {
var index int
var ok bool
if index, ok, err = waitingMissing(conn); ok {
if index == -1 {
checking = false
ck.Store(false)
} else {
if err = send(f, reader, conn, index, chunk, builder); err != nil {
return err
}
//fmt.Println("got missing", index)
mapBuffer.Put(index, "")
}
} else if err != nil {
fmt.Println(err)
if match, _ := regexp.MatchString(".*connection refused.*", err.Error()); match {
time.Sleep(time.Second * 2)

if conn, err = net.Dial("udp", fmt.Sprintf("%s:%d", o.ip, o.port)); err != nil {
fmt.Println(err)
return
}
} else if match, _ := regexp.MatchString(".*i/o timeout.*", err.Error()); match {
if conn, err = net.Dial("udp", fmt.Sprintf("%s:%d", o.ip, o.port)); err != nil {
return
if err = send(f, reader, conn, 0, chunk, builder); err != nil {
return err
}
}

time.Sleep(time.Second * 2)
if err = send(f, reader, conn, 0, chunk, builder); err != nil {
return err
}
}
}

wg.Wait()
endTime := time.Now()
cmd.Println("sent over with", endTime.Sub(beginTime).Seconds())
return
Expand All @@ -168,7 +177,7 @@ func send(f *os.File, reader *bufio.Reader, conn net.Conn, index, chunk int, bui
return
}

_ = retry(10, func() error {
err = retry(30, func() error {
_, err := conn.Write(builder.CreateHeader(index, buf[:n]))
return err
})
Expand All @@ -182,9 +191,9 @@ func waitingMissing(conn net.Conn) (index int, ok bool, err error) {
message := make([]byte, 14)

var rlen int
if err = conn.SetReadDeadline(time.Now().Add(time.Second * 3)); err != nil {
return
}
//if err = conn.SetReadDeadline(time.Now().Add(time.Second * 3)); err != nil {
// return
//}

if rlen, err = conn.Read(message[:]); err == nil && rlen == 14 {
index, ok = checkMissing(message)
Expand Down
Loading

0 comments on commit 18ff216

Please sign in to comment.