-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathchunk.go
95 lines (84 loc) · 2.32 KB
/
chunk.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package pump
import (
"github.com/google/gofountain"
"math"
)
type FountainBlock = fountain.LTBlock
type Chunk struct {
Object Object
Size int64
Offset int64
PacketSize int64
}
type chunkEncoder struct {
chunk Chunk
encoder fountain.Codec
data []byte
symbolCache map[int64]FountainBlock
}
func (ce *chunkEncoder) generatePacket(blockIndex int64) Packet {
if _, available := ce.symbolCache[blockIndex]; !available {
idsToBuild := buildRange(blockIndex, blockIndex+ce.chunk.sourceBlockCount())
blocks := fountain.EncodeLTBlocks(ce.copyOfData(), idsToBuild, ce.encoder)
ce.symbolCache = make(map[int64]FountainBlock)
for _, block := range blocks {
ce.symbolCache[block.BlockCode] = block
}
}
return Packet{Chunk: ce.chunk, Block: ce.symbolCache[blockIndex]}
}
func (ce *chunkEncoder) copyOfData() []byte {
dataCopy := make([]byte, len(ce.data))
copy(dataCopy, ce.data)
return dataCopy
}
type chunkDecoder struct {
chunk Chunk
decoder fountain.Decoder
complete bool
}
func (cd *chunkDecoder) ingest(packet Packet) (finished bool) {
if cd.complete {
return // because adding blocks to completed decoder will corrupt it
}
finished = cd.decoder.AddBlocks([]FountainBlock{packet.Block})
if finished {
cd.complete = true
}
return
}
func (cd *chunkDecoder) data() []byte {
return cd.decoder.Decode()[:cd.chunk.Size]
}
func (c Chunk) sourceBlockCount() int64 {
return int64(math.Ceil(float64(c.alignedSourceBlockSize()) / float64(c.PacketSize)))
}
func (c Chunk) decoder() *chunkDecoder {
return &chunkDecoder{
chunk: c,
decoder: c.codec().NewDecoder(int(c.alignedSourceBlockSize())),
}
}
func (c Chunk) codec() fountain.Codec {
return fountain.NewRaptorCodec(int(c.sourceBlockCount()), 8)
}
func (c Chunk) valid() bool {
return c.sourceBlockCount() <= 8100
}
func (c Chunk) encoder(data []byte) (encoder *chunkEncoder) {
return &chunkEncoder{
encoder: c.codec(),
chunk: c,
data: append(data, c.padding()...),
symbolCache: make(map[int64]FountainBlock),
}
}
func (c Chunk) alignedSourceBlockSize() int64 {
return c.PacketSize * int64(math.Ceil(float64(c.Size)/float64(c.PacketSize)))
}
func (c Chunk) buildIds() []int64 {
return buildRange(0, c.sourceBlockCount()*3)
}
func (c Chunk) padding() []byte {
return make([]byte, c.alignedSourceBlockSize()-c.Size)
}